Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/servers/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,32 @@ The boolean shortcuts map to these modes:
- `task=True` → `TaskConfig(mode="optional")`
- `task=False` → `TaskConfig(mode="forbidden")`

### Poll Interval

<VersionBadge version="2.15.0" />

When clients poll for task status, the server tells them how frequently to check back. By default, FastMCP suggests a 5-second interval, but you can customize this per component:

```python
from datetime import timedelta
from fastmcp import FastMCP
from fastmcp.server.tasks import TaskConfig

mcp = FastMCP("MyServer")

# Poll every 2 seconds for a fast-completing task
@mcp.tool(task=TaskConfig(mode="optional", poll_interval=timedelta(seconds=2)))
async def quick_task() -> str:
return "Done quickly"

# Poll every 30 seconds for a long-running task
@mcp.tool(task=TaskConfig(mode="optional", poll_interval=timedelta(seconds=30)))
async def slow_task() -> str:
return "Eventually done"
```

Shorter intervals give clients faster feedback but increase server load. Longer intervals reduce load but delay status updates.

### Server-Wide Default

To enable background task support for all components by default, pass `tasks=True` to the constructor. Individual decorators can still override this with `task=False`.
Expand Down
2 changes: 2 additions & 0 deletions src/fastmcp/server/tasks/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import inspect
from collections.abc import Callable
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Literal

# Task execution modes per SEP-1686 / MCP ToolExecution.taskSupport
Expand Down Expand Up @@ -46,6 +47,7 @@ async def flexible_task(): ...
"""

mode: TaskMode = "optional"
poll_interval: timedelta = timedelta(seconds=5)

@classmethod
def from_bool(cls, value: bool) -> TaskConfig:
Expand Down
8 changes: 6 additions & 2 deletions src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ async def submit_to_docket(
# Build full task key with embedded metadata
task_key = build_task_key(session_id, server_task_id, task_type, key)

# Store task key mapping and creation timestamp in Redis for protocol handlers
# Store task metadata in Redis for protocol handlers
redis_key = f"fastmcp:task:{session_id}:{server_task_id}"
created_at_key = f"fastmcp:task:{session_id}:{server_task_id}:created_at"
poll_interval_key = f"fastmcp:task:{session_id}:{server_task_id}:poll_interval"
ttl_seconds = int(
docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS
)
poll_interval_ms = int(component.task_config.poll_interval.total_seconds() * 1000)
async with docket.redis() as redis:
await redis.set(redis_key, task_key, ex=ttl_seconds)
await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds)
await redis.set(poll_interval_key, str(poll_interval_ms), ex=ttl_seconds)

# Send notifications/tasks/created per SEP-1686 (mandatory)
# Send BEFORE queuing to avoid race where task completes before notification
Expand Down Expand Up @@ -126,6 +129,7 @@ async def submit_to_docket(
task_key,
ctx.session,
docket,
poll_interval_ms,
)

# Return CreateTaskResult with proper Task object
Expand All @@ -137,6 +141,6 @@ async def submit_to_docket(
createdAt=created_at,
lastUpdatedAt=created_at,
ttl=int(docket.execution_ttl.total_seconds() * 1000),
pollInterval=1000,
pollInterval=poll_interval_ms,
)
)
28 changes: 24 additions & 4 deletions src/fastmcp/server/tasks/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,27 @@ async def tasks_get_handler(server: FastMCP, params: dict[str, Any]) -> GetTaskR
)
)

# Look up full task key and creation timestamp from Redis
# Look up task metadata from Redis
redis_key = f"fastmcp:task:{session_id}:{client_task_id}"
created_at_key = f"fastmcp:task:{session_id}:{client_task_id}:created_at"
poll_interval_key = f"fastmcp:task:{session_id}:{client_task_id}:poll_interval"
async with docket.redis() as redis:
task_key_bytes = await redis.get(redis_key)
created_at_bytes = await redis.get(created_at_key)
poll_interval_bytes = await redis.get(poll_interval_key)

task_key = None if task_key_bytes is None else task_key_bytes.decode("utf-8")
created_at = (
None if created_at_bytes is None else created_at_bytes.decode("utf-8")
)
try:
poll_interval_ms = (
int(poll_interval_bytes.decode("utf-8"))
if poll_interval_bytes
else 5000 # Default to 5 seconds
)
except (ValueError, UnicodeDecodeError):
poll_interval_ms = 5000

if task_key is None:
# Task not found - raise error per MCP protocol
Expand Down Expand Up @@ -129,7 +139,7 @@ async def tasks_get_handler(server: FastMCP, params: dict[str, Any]) -> GetTaskR
createdAt=created_at, # type: ignore[arg-type]
lastUpdatedAt=datetime.now(timezone.utc),
ttl=60000,
pollInterval=1000,
pollInterval=poll_interval_ms,
statusMessage=status_message,
)

Expand Down Expand Up @@ -344,17 +354,27 @@ async def tasks_cancel_handler(
)
)

# Look up full task key and creation timestamp from Redis
# Look up task metadata from Redis
redis_key = f"fastmcp:task:{session_id}:{client_task_id}"
created_at_key = f"fastmcp:task:{session_id}:{client_task_id}:created_at"
poll_interval_key = f"fastmcp:task:{session_id}:{client_task_id}:poll_interval"
async with docket.redis() as redis:
task_key_bytes = await redis.get(redis_key)
created_at_bytes = await redis.get(created_at_key)
poll_interval_bytes = await redis.get(poll_interval_key)

task_key = None if task_key_bytes is None else task_key_bytes.decode("utf-8")
created_at = (
None if created_at_bytes is None else created_at_bytes.decode("utf-8")
)
try:
poll_interval_ms = (
int(poll_interval_bytes.decode("utf-8"))
if poll_interval_bytes
else 5000 # Default to 5 seconds
)
except (ValueError, UnicodeDecodeError):
poll_interval_ms = 5000

if task_key is None:
raise McpError(
Expand Down Expand Up @@ -386,6 +406,6 @@ async def tasks_cancel_handler(
createdAt=created_at or datetime.now(timezone.utc).isoformat(),
lastUpdatedAt=datetime.now(timezone.utc),
ttl=60_000,
pollInterval=1000,
pollInterval=poll_interval_ms,
statusMessage="Task cancelled",
)
12 changes: 10 additions & 2 deletions src/fastmcp/server/tasks/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async def subscribe_to_task_updates(
task_key: str,
session: ServerSession,
docket: Docket,
poll_interval_ms: int = 5000,
) -> None:
"""Subscribe to Docket execution events and send MCP notifications.

Expand All @@ -41,6 +42,7 @@ async def subscribe_to_task_updates(
task_key: Internal Docket execution key (includes session, type, component)
session: MCP ServerSession for sending notifications
docket: Docket instance for subscribing to execution events
poll_interval_ms: Poll interval in milliseconds to include in notifications
"""
try:
execution = await docket.get_execution(task_key)
Expand All @@ -58,6 +60,7 @@ async def subscribe_to_task_updates(
task_key=task_key,
docket=docket,
state=event["state"], # type: ignore[typeddict-item]
poll_interval_ms=poll_interval_ms,
)
elif event["type"] == "progress":
# Send notification when progress message changes
Expand All @@ -67,6 +70,7 @@ async def subscribe_to_task_updates(
task_key=task_key,
docket=docket,
execution=execution,
poll_interval_ms=poll_interval_ms,
)

except Exception as e:
Expand All @@ -79,6 +83,7 @@ async def _send_status_notification(
task_key: str,
docket: Docket,
state: ExecutionState,
poll_interval_ms: int = 5000,
) -> None:
"""Send notifications/tasks/status to client.

Expand All @@ -91,6 +96,7 @@ async def _send_status_notification(
task_key: Internal task key (for metadata lookup)
docket: Docket instance
state: Docket execution state (enum)
poll_interval_ms: Poll interval in milliseconds
"""
# Map Docket state to MCP status
mcp_status = DOCKET_TO_MCP_STATE.get(state, "failed")
Expand Down Expand Up @@ -127,7 +133,7 @@ async def _send_status_notification(
"createdAt": created_at,
"lastUpdatedAt": datetime.now(timezone.utc).isoformat(),
"ttl": 60000,
"pollInterval": 1000,
"pollInterval": poll_interval_ms,
}

if status_message:
Expand All @@ -149,6 +155,7 @@ async def _send_progress_notification(
task_key: str,
docket: Docket,
execution: Execution,
poll_interval_ms: int = 5000,
) -> None:
"""Send notifications/tasks/status when progress updates.

Expand All @@ -158,6 +165,7 @@ async def _send_progress_notification(
task_key: Internal task key
docket: Docket instance
execution: Execution object with current progress
poll_interval_ms: Poll interval in milliseconds
"""
# Sync execution to get latest progress
await execution.sync()
Expand Down Expand Up @@ -192,7 +200,7 @@ async def _send_progress_notification(
"createdAt": created_at,
"lastUpdatedAt": datetime.now(timezone.utc).isoformat(),
"ttl": 60000,
"pollInterval": 1000,
"pollInterval": poll_interval_ms,
"statusMessage": execution.progress.message,
}

Expand Down
2 changes: 1 addition & 1 deletion tests/server/middleware/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async def test_on_message_with_resource_template_in_payload(

assert get_log_lines(caplog) == snapshot(
[
'{"event": "request_start", "method": "test_method", "source": "client", "payload": "{\\"name\\":\\"tmpl\\",\\"title\\":null,\\"description\\":null,\\"icons\\":null,\\"tags\\":[],\\"meta\\":null,\\"enabled\\":true,\\"task_config\\":{\\"mode\\":\\"forbidden\\"},\\"uri_template\\":\\"tmpl://{id}\\",\\"mime_type\\":\\"text/plain\\",\\"parameters\\":{\\"id\\":{\\"type\\":\\"string\\"}},\\"annotations\\":null}", "payload_type": "ResourceTemplate"}',
'{"event": "request_start", "method": "test_method", "source": "client", "payload": "{\\"name\\":\\"tmpl\\",\\"title\\":null,\\"description\\":null,\\"icons\\":null,\\"tags\\":[],\\"meta\\":null,\\"enabled\\":true,\\"task_config\\":{\\"mode\\":\\"forbidden\\",\\"poll_interval\\":\\"PT5S\\"},\\"uri_template\\":\\"tmpl://{id}\\",\\"mime_type\\":\\"text/plain\\",\\"parameters\\":{\\"id\\":{\\"type\\":\\"string\\"}},\\"annotations\\":null}", "payload_type": "ResourceTemplate"}',
'{"event": "request_success", "method": "test_method", "source": "client", "duration_ms": 0.02}',
]
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Tests for TaskConfig mode enforcement (SEP-1686).
"""Tests for TaskConfig (SEP-1686).

Tests that the server correctly enforces task execution modes:
- "forbidden": No task support, error if client requests task
- "optional": Supports both sync and task execution
- "required": Requires task execution, error if client doesn't request task
Tests for TaskConfig:
- Mode enforcement (forbidden, optional, required)
- Poll interval configuration
"""

from datetime import timedelta

import pytest
from mcp.shared.exceptions import McpError
from mcp.types import TextContent, ToolExecution
Expand Down Expand Up @@ -352,3 +353,41 @@ def sync_tool() -> str:
tool = await mcp._tool_manager.get_tool("sync_tool")
assert isinstance(tool, Tool)
assert tool.task_config.mode == "forbidden"


class TestPollIntervalConfiguration:
"""Test poll_interval configuration in TaskConfig."""

async def test_default_poll_interval_is_5_seconds(self):
"""Default poll_interval should be 5 seconds."""
config = TaskConfig()
assert config.poll_interval == timedelta(seconds=5)

async def test_custom_poll_interval_preserved(self):
"""Custom poll_interval should be preserved in TaskConfig."""
config = TaskConfig(poll_interval=timedelta(seconds=10))
assert config.poll_interval == timedelta(seconds=10)

async def test_tool_inherits_poll_interval(self):
"""Tool should inherit poll_interval from TaskConfig."""
mcp = FastMCP("test", tasks=False)

@mcp.tool(task=TaskConfig(mode="optional", poll_interval=timedelta(seconds=2)))
async def my_tool() -> str:
return "ok"

tool = await mcp._tool_manager.get_tool("my_tool")
assert isinstance(tool, Tool)
assert tool.task_config.poll_interval == timedelta(seconds=2)

async def test_task_true_uses_default_poll_interval(self):
"""task=True should use default 5 second poll_interval."""
mcp = FastMCP("test", tasks=False)

@mcp.tool(task=True)
async def my_tool() -> str:
return "ok"

tool = await mcp._tool_manager.get_tool("my_tool")
assert isinstance(tool, Tool)
assert tool.task_config.poll_interval == timedelta(seconds=5)
Loading
Loading