Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/servers/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,8 @@ Background tasks require explicit opt-in:
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `FASTMCP_ENABLE_TASKS` | `false` | Enable the MCP task protocol |
| `FASTMCP_ENABLE_DOCKET` | `false` | Enable the Docket task system |
| `FASTMCP_DOCKET_URL` | `memory://` | Backend URL (`memory://` or `redis://host:port/db`) |

Both `ENABLE_TASKS` and `ENABLE_DOCKET` must be `true` for background tasks to work.

You can also set a server-wide default in the constructor:

```python
Expand Down
3 changes: 0 additions & 3 deletions examples/tasks/.envrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
# This file is loaded by direnv (https://direnv.net/) when you cd into this directory
# Run `direnv allow` to enable automatic environment loading

# Enable Docket support for background task execution
export FASTMCP_ENABLE_DOCKET=true

# Enable MCP SEP-1686 task protocol support
export FASTMCP_ENABLE_TASKS=true

Expand Down
1 change: 0 additions & 1 deletion examples/tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ fastmcp tasks worker server.py

| Variable | Default | Description |
|----------|---------|-------------|
| `FASTMCP_ENABLE_DOCKET` | `false` | Enable Docket task system |
| `FASTMCP_ENABLE_TASKS` | `false` | Enable MCP task protocol (SEP-1686) |
| `FASTMCP_DOCKET_URL` | `memory://` | Docket backend URL |

Expand Down
22 changes: 7 additions & 15 deletions src/fastmcp/cli/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@
)


def check_docket_enabled() -> None:
"""Check if Docket is enabled with a distributed backend.
def check_distributed_backend() -> None:
"""Check if Docket is configured with a distributed backend.

The CLI worker runs as a separate process, so it needs Redis/Valkey
to coordinate with the main server process.

Raises:
SystemExit: If Docket isn't enabled or using memory:// URL
SystemExit: If using memory:// URL
"""
import fastmcp

# Check if Docket is enabled
if not fastmcp.settings.enable_docket:
console.print(
"[bold red]✗ Docket not enabled[/bold red]\n\n"
"Docket task support is not enabled.\n\n"
"To enable Docket, set the environment variable:\n"
" [cyan]export FASTMCP_ENABLE_DOCKET=true[/cyan]\n\n"
"Then try again."
)
sys.exit(1)

docket_url = fastmcp.settings.docket.url

# Check for memory:// URL and provide helpful error
Expand Down Expand Up @@ -86,7 +78,7 @@ def worker(
"""
import fastmcp

check_docket_enabled()
check_distributed_backend()

# Load server to get task functions
try:
Expand Down
2 changes: 1 addition & 1 deletion src/fastmcp/client/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ async def connect_session(

experimental_capabilities = {}
if fastmcp.settings.enable_tasks:
# Declare SEP-1686 task support (enable_tasks requires enable_docket via validator)
# Declare SEP-1686 task support
experimental_capabilities["tasks"] = {
"tools": True,
"prompts": True,
Expand Down
42 changes: 9 additions & 33 deletions src/fastmcp/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,21 +297,12 @@ class _CurrentDocket(Dependency):
"""Internal dependency class for CurrentDocket."""

async def __aenter__(self) -> Docket:
import fastmcp

# Check if flag is enabled
if not fastmcp.settings.enable_docket:
raise RuntimeError(
"Docket support is not enabled. "
"Set FASTMCP_ENABLE_DOCKET=true to enable Docket support."
)

# Get Docket from ContextVar (set by _docket_lifespan)
docket = _current_docket.get()
if docket is None:
raise RuntimeError(
"No Docket instance found. This should not happen when "
"FASTMCP_ENABLE_DOCKET is enabled."
"No Docket instance found. Docket is only available within "
"a running FastMCP server context."
)

return docket
Expand All @@ -321,16 +312,13 @@ def CurrentDocket() -> Docket:
"""Get the current Docket instance managed by FastMCP.

This dependency provides access to the Docket instance that FastMCP
automatically creates when Docket support is enabled.

Requires:
- FASTMCP_ENABLE_DOCKET=true
automatically creates for background task scheduling.

Returns:
A dependency that resolves to the active Docket instance

Raises:
RuntimeError: If flag not enabled (during resolution)
RuntimeError: If not within a FastMCP server context

Example:
```python
Expand All @@ -349,19 +337,11 @@ class _CurrentWorker(Dependency):
"""Internal dependency class for CurrentWorker."""

async def __aenter__(self) -> Worker:
import fastmcp

if not fastmcp.settings.enable_docket:
raise RuntimeError(
"Docket support is not enabled. "
"Set FASTMCP_ENABLE_DOCKET=true to enable Docket support."
)

worker = _current_worker.get()
if worker is None:
raise RuntimeError(
"No Worker instance found. This should not happen when "
"FASTMCP_ENABLE_DOCKET is enabled."
"No Worker instance found. Worker is only available within "
"a running FastMCP server context."
)

return worker
Expand All @@ -371,16 +351,13 @@ def CurrentWorker() -> Worker:
"""Get the current Docket Worker instance managed by FastMCP.

This dependency provides access to the Worker instance that FastMCP
automatically creates when Docket support is enabled.

Requires:
- FASTMCP_ENABLE_DOCKET=true
automatically creates for background task processing.

Returns:
A dependency that resolves to the active Worker instance

Raises:
RuntimeError: If flag not enabled (during resolution)
RuntimeError: If not within a FastMCP server context

Example:
```python
Expand Down Expand Up @@ -463,8 +440,7 @@ async def __aenter__(self) -> DocketProgress:
docket = _current_docket.get()
if docket is None:
raise RuntimeError(
"Progress dependency requires Docket to be enabled. "
"Set FASTMCP_ENABLE_DOCKET=true"
"Progress dependency requires a FastMCP server context."
) from None

# Return in-memory progress for immediate execution
Expand Down
94 changes: 48 additions & 46 deletions src/fastmcp/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import inspect
import re
import secrets
Expand All @@ -19,6 +20,7 @@
AbstractAsyncContextManager,
AsyncExitStack,
asynccontextmanager,
suppress,
)
from dataclasses import dataclass
from functools import partial
Expand Down Expand Up @@ -213,6 +215,7 @@ def __init__(
self._lifespan: LifespanCallable[LifespanResultT] = lifespan or default_lifespan
self._lifespan_result: LifespanResultT | None = None
self._lifespan_result_set: bool = False
self._started: asyncio.Event = asyncio.Event()

# Generate random ID if no name provided
self._mcp_server: LowLevelServer[LifespanResultT, Any] = LowLevelServer[
Expand Down Expand Up @@ -374,34 +377,16 @@ def docket(self) -> Docket | None:
return self._docket

@asynccontextmanager
async def _docket_lifespan(
self, user_lifespan_result: LifespanResultT
) -> AsyncIterator[LifespanResultT]:
"""Manage Docket instance and Worker when experimental support is enabled.

Args:
user_lifespan_result: The result from the user's lifespan function

Yields:
User's lifespan result (Docket is managed via ContextVar, not lifespan result)
"""
async def _docket_lifespan(self) -> AsyncIterator[None]:
"""Manage Docket instance and Worker for background task execution."""
from fastmcp import settings
from fastmcp.server.dependencies import _current_docket, _current_worker

# Validate configuration
if settings.enable_tasks and not settings.enable_docket:
raise RuntimeError(
"Server requires enable_docket=True when enable_tasks=True. "
"Task protocol support needs Docket for background execution."
)

if not settings.enable_docket:
# Docket support not enabled, pass through user lifespan result
yield user_lifespan_result
return

# Set FastMCP server in ContextVar so CurrentFastMCP can access it (use weakref to avoid reference cycles)
from fastmcp.server.dependencies import _current_server
from fastmcp.server.dependencies import (
_current_docket,
_current_server,
_current_worker,
)

server_token = _current_server.set(weakref.ref(self))

Expand All @@ -414,9 +399,10 @@ async def _docket_lifespan(
# Store on server instance for cross-task access (FastMCPTransport)
self._docket = docket

# Register task-enabled tools/prompts/resources with Docket
tools = await self.get_tools()
for tool in tools.values():
# Register local task-enabled tools/prompts/resources with Docket
for tool in self._tool_manager._tools.values():
if not hasattr(tool, "fn"):
continue
supports_task = (
tool.task
if tool.task is not None
Expand All @@ -425,8 +411,9 @@ async def _docket_lifespan(
if supports_task:
docket.register(tool.fn)

prompts = await self.get_prompts()
for prompt in prompts.values():
for prompt in self._prompt_manager._prompts.values():
if not hasattr(prompt, "fn"):
continue
supports_task = (
prompt.task
if prompt.task is not None
Expand All @@ -435,8 +422,9 @@ async def _docket_lifespan(
if supports_task:
docket.register(prompt.fn)

resources = await self.get_resources()
for resource in resources.values():
for resource in self._resource_manager._resources.values():
if not hasattr(resource, "fn"):
continue
supports_task = (
resource.task
if resource.task is not None
Expand All @@ -445,6 +433,17 @@ async def _docket_lifespan(
if supports_task:
docket.register(resource.fn)

for template in self._resource_manager._templates.values():
if not hasattr(template, "fn"):
continue
supports_task = (
template.task
if template.task is not None
else self._support_tasks_by_default
)
if supports_task:
docket.register(template.fn)

# Set Docket in ContextVar so CurrentDocket can access it
docket_token = _current_docket.set(docket)
try:
Expand All @@ -457,22 +456,21 @@ async def _docket_lifespan(
if settings.docket.worker_name:
worker_kwargs["name"] = settings.docket.worker_name

# Create and start Worker, then task group for run_forever()
async with (
Worker(docket, **worker_kwargs) as worker, # type: ignore[arg-type]
anyio.create_task_group() as tg,
):
# Create and start Worker
async with Worker(docket, **worker_kwargs) as worker: # type: ignore[arg-type]
# Set Worker in ContextVar so CurrentWorker can access it
worker_token = _current_worker.set(worker)
try:
# Start worker as background task
tg.start_soon(worker.run_forever)

worker_task = asyncio.create_task(worker.run_forever())
try:
yield user_lifespan_result
yield
finally:
# Cancel task group when exiting (cancels worker)
tg.cancel_scope.cancel()
# Cancel worker task on exit with timeout to prevent hanging
worker_task.cancel()
with suppress(
asyncio.CancelledError, asyncio.TimeoutError
):
await asyncio.wait_for(worker_task, timeout=2.0)
finally:
_current_worker.reset(worker_token)
finally:
Expand All @@ -492,9 +490,9 @@ async def _lifespan_manager(self) -> AsyncIterator[None]:

async with (
self._lifespan(self) as user_lifespan_result,
self._docket_lifespan(user_lifespan_result) as lifespan_result,
self._docket_lifespan(),
):
self._lifespan_result = lifespan_result
self._lifespan_result = user_lifespan_result
self._lifespan_result_set = True

async with AsyncExitStack[bool | None]() as stack:
Expand All @@ -503,7 +501,11 @@ async def _lifespan_manager(self) -> AsyncIterator[None]:
cm=server.server._lifespan_manager()
)

yield
self._started.set()
try:
yield
finally:
self._started.clear()

self._lifespan_result_set = False
self._lifespan_result = None
Expand Down
4 changes: 2 additions & 2 deletions src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def handle_tool_as_task(
raise McpError(
ErrorData(
code=INTERNAL_ERROR,
message="Background tasks require Docket. Set FASTMCP_ENABLE_DOCKET=true",
message="Background tasks require a running FastMCP server context",
)
)

Expand Down Expand Up @@ -169,7 +169,7 @@ async def handle_prompt_as_task(
raise McpError(
ErrorData(
code=INTERNAL_ERROR,
message="Background tasks require Docket. Set FASTMCP_ENABLE_DOCKET=true",
message="Background tasks require a running FastMCP server context",
)
)

Expand Down
Loading
Loading