Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies = [
"mcp>=1.24.0,<2.0",
"openapi-pydantic>=0.5.1",
"platformdirs>=4.0.0",
"pydocket>=0.16.4",
"pydocket>=0.16.6",
"rich>=13.9.4",
"cyclopts>=4.0.0",
"authlib>=1.6.5",
Expand Down
32 changes: 28 additions & 4 deletions src/fastmcp/server/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,45 @@ async def __aenter__(self) -> Context:
self._tokens.append(token)

# Set current server for dependency injection (use weakref to avoid reference cycles)
from fastmcp.server.dependencies import _current_server
from fastmcp.server.dependencies import (
_current_docket,
_current_server,
_current_worker,
)

self._server_token = _current_server.set(weakref.ref(self.fastmcp))

# Set docket/worker from server instance for this request's context.
# This ensures ContextVars work even in environments (like Lambda) where
# lifespan ContextVars don't propagate to request handlers.
server = self.fastmcp
if server._docket is not None:
self._docket_token = _current_docket.set(server._docket)

if server._worker is not None:
self._worker_token = _current_worker.set(server._worker)

return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit the context manager and reset the most recent token."""
# Flush any remaining notifications before exiting
await self._flush_notifications()

# Reset server token
if hasattr(self, "_server_token"):
from fastmcp.server.dependencies import _current_server
# Reset server/docket/worker tokens
from fastmcp.server.dependencies import (
_current_docket,
_current_server,
_current_worker,
)

if hasattr(self, "_worker_token"):
_current_worker.reset(self._worker_token)
delattr(self, "_worker_token")
if hasattr(self, "_docket_token"):
_current_docket.reset(self._docket_token)
delattr(self, "_docket_token")
if hasattr(self, "_server_token"):
_current_server.reset(self._server_token)
delattr(self, "_server_token")

Expand Down
11 changes: 7 additions & 4 deletions src/fastmcp/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ def __init__(
# Resolve server default for background task support
self._support_tasks_by_default: bool = tasks if tasks is not None else False

# Docket instance (set during lifespan for cross-task access)
# Docket and Worker instances (set during lifespan for cross-task access)
self._docket = None
self._worker = None

self._additional_http_routes: list[BaseRoute] = []
self._mounted_servers: list[MountedServer] = []
Expand Down Expand Up @@ -468,6 +469,8 @@ async def _docket_lifespan(self) -> AsyncIterator[None]:

# Create and start Worker
async with Worker(docket, **worker_kwargs) as worker: # type: ignore[arg-type]
# Store on server instance for cross-context access
self._worker = worker
# Set Worker in ContextVar so CurrentWorker can access it
worker_token = _current_worker.set(worker)
try:
Expand All @@ -480,13 +483,11 @@ async def _docket_lifespan(self) -> AsyncIterator[None]:
await worker_task
finally:
_current_worker.reset(worker_token)
self._worker = None
finally:
# Reset ContextVar
_current_docket.reset(docket_token)
# Clear instance attribute
self._docket = None
finally:
# Reset server ContextVar
_current_server.reset(server_token)

async def _register_mounted_server_functions(
Expand Down Expand Up @@ -564,6 +565,8 @@ async def _register_mounted_server_functions(
@asynccontextmanager
async def _lifespan_manager(self) -> AsyncIterator[None]:
if self._lifespan_result_set:
# Lifespan already ran - ContextVars will be set by Context.__aenter__
# at request time, so we just yield here.
yield
return

Expand Down
5 changes: 4 additions & 1 deletion src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def handle_tool_as_task(
ctx = get_context()
session_id = ctx.session_id

# Get Docket from ContextVar (set by Context.__aenter__ at request time)
docket = _current_docket.get()
if docket is None:
raise McpError(
Expand Down Expand Up @@ -167,6 +168,7 @@ async def handle_prompt_as_task(
ctx = get_context()
session_id = ctx.session_id

# Get Docket from ContextVar (set by Context.__aenter__ at request time)
docket = _current_docket.get()
if docket is None:
raise McpError(
Expand Down Expand Up @@ -276,12 +278,13 @@ async def handle_resource_as_task(
ctx = get_context()
session_id = ctx.session_id

# Get Docket from ContextVar (set by Context.__aenter__ at request time)
docket = _current_docket.get()
if docket is None:
raise McpError(
ErrorData(
code=INTERNAL_ERROR,
message="Background tasks require Docket",
message="Background tasks require a running FastMCP server context",
)
)

Expand Down
2 changes: 1 addition & 1 deletion src/fastmcp/server/tasks/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def subscribe_to_task_updates(
)

except Exception as e:
logger.warning(f"Subscription task failed for {task_id}: {e}", exc_info=True)
logger.error(f"subscribe_to_task_updates failed for {task_id}: {e}")


async def _send_status_notification(
Expand Down
8 changes: 4 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.