diff --git a/pyproject.toml b/pyproject.toml index 3010b826eb..431db8fed5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "mcp>=1.24.0,<2.0", "openapi-pydantic>=0.5.1", "platformdirs>=4.0.0", - "pydocket>=0.16.4", + "pydocket>=0.16.6", "rich>=13.9.4", "cyclopts>=4.0.0", "authlib>=1.6.5", diff --git a/src/fastmcp/server/context.py b/src/fastmcp/server/context.py index 8cd4f7307a..5370f04690 100644 --- a/src/fastmcp/server/context.py +++ b/src/fastmcp/server/context.py @@ -185,10 +185,24 @@ async def __aenter__(self) -> Context: self._tokens.append(token) # Set current server for dependency injection (use weakref to avoid reference cycles) - from fastmcp.server.dependencies import _current_server + from fastmcp.server.dependencies import ( + _current_docket, + _current_server, + _current_worker, + ) self._server_token = _current_server.set(weakref.ref(self.fastmcp)) + # Set docket/worker from server instance for this request's context. + # This ensures ContextVars work even in environments (like Lambda) where + # lifespan ContextVars don't propagate to request handlers. + server = self.fastmcp + if server._docket is not None: + self._docket_token = _current_docket.set(server._docket) + + if server._worker is not None: + self._worker_token = _current_worker.set(server._worker) + return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: @@ -196,10 +210,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # Flush any remaining notifications before exiting await self._flush_notifications() - # Reset server token - if hasattr(self, "_server_token"): - from fastmcp.server.dependencies import _current_server + # Reset server/docket/worker tokens + from fastmcp.server.dependencies import ( + _current_docket, + _current_server, + _current_worker, + ) + if hasattr(self, "_worker_token"): + _current_worker.reset(self._worker_token) + delattr(self, "_worker_token") + if hasattr(self, "_docket_token"): + _current_docket.reset(self._docket_token) + delattr(self, "_docket_token") + if hasattr(self, "_server_token"): _current_server.reset(self._server_token) delattr(self, "_server_token") diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index 295e2aa70a..44ac0aab20 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -197,8 +197,9 @@ def __init__( # Resolve server default for background task support self._support_tasks_by_default: bool = tasks if tasks is not None else False - # Docket instance (set during lifespan for cross-task access) + # Docket and Worker instances (set during lifespan for cross-task access) self._docket = None + self._worker = None self._additional_http_routes: list[BaseRoute] = [] self._mounted_servers: list[MountedServer] = [] @@ -468,6 +469,8 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: # Create and start Worker async with Worker(docket, **worker_kwargs) as worker: # type: ignore[arg-type] + # Store on server instance for cross-context access + self._worker = worker # Set Worker in ContextVar so CurrentWorker can access it worker_token = _current_worker.set(worker) try: @@ -480,13 +483,11 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: await worker_task finally: _current_worker.reset(worker_token) + self._worker = None finally: - # Reset ContextVar _current_docket.reset(docket_token) - # Clear instance attribute self._docket = None finally: - # Reset server ContextVar _current_server.reset(server_token) async def _register_mounted_server_functions( @@ -564,6 +565,8 @@ async def _register_mounted_server_functions( @asynccontextmanager async def _lifespan_manager(self) -> AsyncIterator[None]: if self._lifespan_result_set: + # Lifespan already ran - ContextVars will be set by Context.__aenter__ + # at request time, so we just yield here. yield return diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 3f528a0b59..ecfa0c8082 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -56,6 +56,7 @@ async def handle_tool_as_task( ctx = get_context() session_id = ctx.session_id + # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() if docket is None: raise McpError( @@ -167,6 +168,7 @@ async def handle_prompt_as_task( ctx = get_context() session_id = ctx.session_id + # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() if docket is None: raise McpError( @@ -276,12 +278,13 @@ async def handle_resource_as_task( ctx = get_context() session_id = ctx.session_id + # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() if docket is None: raise McpError( ErrorData( code=INTERNAL_ERROR, - message="Background tasks require Docket", + message="Background tasks require a running FastMCP server context", ) ) diff --git a/src/fastmcp/server/tasks/subscriptions.py b/src/fastmcp/server/tasks/subscriptions.py index 9bbf0fad16..c22e88f041 100644 --- a/src/fastmcp/server/tasks/subscriptions.py +++ b/src/fastmcp/server/tasks/subscriptions.py @@ -70,7 +70,7 @@ async def subscribe_to_task_updates( ) except Exception as e: - logger.warning(f"Subscription task failed for {task_id}: {e}", exc_info=True) + logger.error(f"subscribe_to_task_updates failed for {task_id}: {e}") async def _send_status_notification( diff --git a/uv.lock b/uv.lock index afb7d2323d..b17a98f839 100644 --- a/uv.lock +++ b/uv.lock @@ -752,7 +752,7 @@ requires-dist = [ { name = "platformdirs", specifier = ">=4.0.0" }, { name = "py-key-value-aio", extras = ["disk", "keyring", "memory"], specifier = ">=0.3.0,<0.4.0" }, { name = "pydantic", extras = ["email"], specifier = ">=2.11.7" }, - { name = "pydocket", specifier = ">=0.16.4" }, + { name = "pydocket", specifier = ">=0.16.6" }, { name = "pyperclip", specifier = ">=1.9.0" }, { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "rich", specifier = ">=13.9.4" }, @@ -1775,7 +1775,7 @@ wheels = [ [[package]] name = "pydocket" -version = "0.16.4" +version = "0.16.6" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cloudpickle" }, @@ -1792,9 +1792,9 @@ dependencies = [ { name = "typer" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/4d/c6/eb7f3af72fa5c04b52a3f9390ff0c948987441987f9526dd992d2a6b3524/pydocket-0.16.4.tar.gz", hash = "sha256:d034d1ac75877560d86329fb3643e7b862fcbcdac407d876a62f5d9e386e8753", size = 297949, upload-time = "2026-01-08T21:58:31.637Z" } +sdist = { url = "https://files.pythonhosted.org/packages/72/00/26befe5f58df7cd1aeda4a8d10bc7d1908ffd86b80fd995e57a2a7b3f7bd/pydocket-0.16.6.tar.gz", hash = "sha256:b96c96ad7692827214ed4ff25fcf941ec38371314db5dcc1ae792b3e9d3a0294", size = 299054, upload-time = "2026-01-09T22:09:15.405Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/74/c5/e6ffed3902ead6cb906758749c42211f7f24ea6d8fdd772f531f5a81c9fa/pydocket-0.16.4-py3-none-any.whl", hash = "sha256:cdcdf74b987c2cd5d03c7353d15f8dd2ac9bd43f2a91f7441748d5a8ebd617c9", size = 67374, upload-time = "2026-01-08T21:58:30.01Z" }, + { url = "https://files.pythonhosted.org/packages/0a/3f/7483e5a6dc6326b6e0c640619b5c5bd1d6e3c20e54d58f5fb86267cef00e/pydocket-0.16.6-py3-none-any.whl", hash = "sha256:683d21e2e846aa5106274e7d59210331b242d7fb0dce5b08d3b82065663ed183", size = 67697, upload-time = "2026-01-09T22:09:13.436Z" }, ] [[package]]