Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/fastmcp/server/providers/fastmcp_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from fastmcp.resources.resource import Resource, ResourceResult
from fastmcp.resources.template import ResourceTemplate
from fastmcp.server.providers.base import Provider
from fastmcp.server.tasks.config import TaskMeta
from fastmcp.tools.tool import Tool, ToolResult
from fastmcp.utilities.components import FastMCPComponent

Expand Down Expand Up @@ -85,24 +86,31 @@ def wrap(cls, server: Any, tool: Tool) -> FastMCPProviderTool:
)

async def _run(
self, arguments: dict[str, Any]
self,
arguments: dict[str, Any],
task_meta: TaskMeta | None = None,
) -> ToolResult | mcp.types.CreateTaskResult:
"""Skip task handling - delegate to run() which calls child middleware.
"""Delegate to child server's call_tool() with task_meta.

The actual underlying tool will check _task_metadata contextvar and
submit to Docket if appropriate. This wrapper just passes through.
Passes task_meta through to the child server so it can handle
backgrounding appropriately.
"""
return await self.run(arguments)
return await self._server.call_tool(
self._original_name, arguments, task_meta=task_meta
)

async def run(
self, arguments: dict[str, Any]
) -> ToolResult | mcp.types.CreateTaskResult: # type: ignore[override]
"""Delegate to child server's call_tool().
"""Not implemented - use _run() which delegates to child server.

This runs BEFORE any backgrounding decision - the actual underlying
tool will check contextvars and submit to Docket if appropriate.
FastMCPProviderTool._run() handles all execution by delegating
to the child server's call_tool() with task_meta.
"""
return await self._server.call_tool(self._original_name, arguments)
raise NotImplementedError(
"FastMCPProviderTool.run() should not be called directly. "
"Use _run() which delegates to the child server's call_tool()."
)


class FastMCPProviderResource(Resource):
Expand Down
69 changes: 47 additions & 22 deletions src/fastmcp/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.server.providers import LocalProvider, Provider
from fastmcp.server.tasks.capabilities import get_task_capabilities
from fastmcp.server.tasks.config import TaskConfig
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
from fastmcp.settings import DuplicateBehavior as DuplicateBehaviorSetting
from fastmcp.settings import Settings
from fastmcp.tools.tool import FunctionTool, Tool, ToolResult
Expand Down Expand Up @@ -1126,12 +1126,33 @@ async def get_component(

raise NotFoundError(f"Unknown component: {key}")

@overload
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
*,
run_middleware: bool = True,
task_meta: None = None,
) -> ToolResult: ...

@overload
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
*,
run_middleware: bool = True,
task_meta: TaskMeta,
) -> mcp.types.CreateTaskResult: ...

async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
*,
run_middleware: bool = True,
task_meta: TaskMeta | None = None,
) -> ToolResult | mcp.types.CreateTaskResult:
"""Call a tool by name.

Expand All @@ -1142,16 +1163,23 @@ async def call_tool(
arguments: Tool arguments (optional)
run_middleware: If True (default), apply the middleware chain.
Set to False when called from middleware to avoid re-applying.
task_meta: If provided, execute as a background task and return
CreateTaskResult. If None (default), execute synchronously and
return ToolResult.

Returns:
ToolResult with content and optional structured_content.
May return CreateTaskResult if called in MCP context with task metadata.
ToolResult when task_meta is None.
CreateTaskResult when task_meta is provided.

Raises:
NotFoundError: If tool not found or disabled
ToolError: If tool execution fails
ValidationError: If arguments fail validation
"""
# Enrich task_meta with fn_key if task execution requested
if task_meta is not None and task_meta.fn_key is None:
task_meta = TaskMeta(ttl=task_meta.ttl, fn_key=Tool.make_key(name))

async with fastmcp.server.context.Context(fastmcp=self) as ctx:
if run_middleware:
mw_context = MiddlewareContext[CallToolRequestParams](
Expand All @@ -1169,6 +1197,7 @@ async def call_tool(
context.message.name,
context.message.arguments or {},
run_middleware=False,
task_meta=task_meta,
),
)

Expand All @@ -1177,7 +1206,7 @@ async def call_tool(
tool = await provider.get_tool(name)
if tool is not None and self._is_component_enabled(tool):
try:
return await tool._run(arguments or {})
return await tool._run(arguments or {}, task_meta=task_meta)
except FastMCPError:
logger.exception(f"Error calling tool {name!r}")
raise
Expand Down Expand Up @@ -1491,8 +1520,9 @@ async def _call_tool_mcp(
"""
Handle MCP 'callTool' requests.

Sets task metadata contextvar and calls call_tool(). The tool's _run() method
handles the backgrounding decision, ensuring middleware runs before Docket.
Extracts task metadata from MCP request context and passes it explicitly
to call_tool(). The tool's _run() method handles the backgrounding decision,
ensuring middleware runs before Docket.

Args:
key: The name of the tool to call
Expand All @@ -1501,35 +1531,30 @@ async def _call_tool_mcp(
Returns:
Tool result or CreateTaskResult for background execution
"""
from fastmcp.server.dependencies import _docket_fn_key, _task_metadata

logger.debug(
f"[{self.name}] Handler called: call_tool %s with %s", key, arguments
)

try:
# Extract SEP-1686 task metadata from request context
task_meta_dict: dict[str, Any] | None = None
task_meta: TaskMeta | None = None
try:
ctx = self._mcp_server.request_context
if ctx.experimental.is_task:
task_meta = ctx.experimental.task_metadata
task_meta_dict = task_meta.model_dump(exclude_none=True)
mcp_task_meta = ctx.experimental.task_metadata
task_meta_dict = mcp_task_meta.model_dump(exclude_none=True)
task_meta = TaskMeta(
ttl=task_meta_dict.get("ttl"),
fn_key=Tool.make_key(key),
)
except (AttributeError, LookupError):
pass

# Set contextvars so tool._run() can access them
task_token = _task_metadata.set(task_meta_dict)
key_token = _docket_fn_key.set(Tool.make_key(key))
try:
result = await self.call_tool(key, arguments)
result = await self.call_tool(key, arguments, task_meta=task_meta)

if isinstance(result, mcp.types.CreateTaskResult):
return result
return result.to_mcp_result()
finally:
_task_metadata.reset(task_token)
_docket_fn_key.reset(key_token)
if isinstance(result, mcp.types.CreateTaskResult):
return result
return result.to_mcp_result()

except DisabledError as e:
raise NotFoundError(f"Unknown tool: {key!r}") from e
Expand Down
3 changes: 2 additions & 1 deletion src/fastmcp/server/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

from fastmcp.server.tasks.capabilities import get_task_capabilities
from fastmcp.server.tasks.config import TaskConfig, TaskMode
from fastmcp.server.tasks.config import TaskConfig, TaskMeta, TaskMode
from fastmcp.server.tasks.keys import (
build_task_key,
get_client_task_id_from_key,
Expand All @@ -13,6 +13,7 @@

__all__ = [
"TaskConfig",
"TaskMeta",
"TaskMode",
"build_task_key",
"get_client_task_id_from_key",
Expand Down
16 changes: 16 additions & 0 deletions src/fastmcp/server/tasks/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@
DEFAULT_TTL_MS = 60_000 # Default TTL in milliseconds


@dataclass
class TaskMeta:
"""Metadata for task-augmented execution requests.

When passed to call_tool/read_resource/get_prompt, signals that
the operation should be submitted as a background task.

Attributes:
ttl: Client-requested TTL in milliseconds. If None, uses server default.
fn_key: Docket routing key. Auto-derived from component name if None.
"""

ttl: int | None = None
fn_key: str | None = None


@dataclass
class TaskConfig:
"""Configuration for MCP background task execution (SEP-1686).
Expand Down
27 changes: 17 additions & 10 deletions src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from mcp.types import INTERNAL_ERROR, ErrorData

from fastmcp.server.dependencies import _current_docket, get_context
from fastmcp.server.tasks.config import TaskMeta
from fastmcp.server.tasks.keys import build_task_key

if TYPE_CHECKING:
Expand All @@ -32,6 +33,7 @@ async def submit_to_docket(
key: str,
component: Tool | Resource | ResourceTemplate | Prompt,
arguments: dict[str, Any] | None = None,
task_meta: TaskMeta | None = None,
) -> mcp.types.CreateTaskResult:
"""Submit any component to Docket for background execution (SEP-1686).

Expand All @@ -41,15 +43,13 @@ async def submit_to_docket(
Queues the component's method to Docket, stores raw return values,
and converts to MCP types on retrieval.

Note: Client-requested TTL in task_meta is intentionally ignored.
Server-side TTL policy (docket.execution_ttl) takes precedence for
consistent task lifecycle management.

Args:
task_type: Component type for task key construction
key: The component key as seen by MCP layer (with namespace prefix)
component: The component instance (Tool, Resource, ResourceTemplate, Prompt)
arguments: Arguments/params (None for Resource which has no args)
task_meta: Task execution metadata. If task_meta.ttl is provided, it
overrides the server default (docket.execution_ttl).

Returns:
CreateTaskResult: Task stub with proper Task object
Expand All @@ -61,9 +61,12 @@ async def submit_to_docket(
# Record creation timestamp per SEP-1686 final spec (line 430)
created_at = datetime.now(timezone.utc)

# Get session ID and Docket
# Get session ID - use "internal" for programmatic calls without MCP session
ctx = get_context()
session_id = ctx.session_id
try:
session_id = ctx.session_id
except RuntimeError:
session_id = "internal"

docket = _current_docket.get()
if docket is None:
Expand All @@ -77,13 +80,17 @@ async def submit_to_docket(
# Build full task key with embedded metadata
task_key = build_task_key(session_id, server_task_id, task_type, key)

# Determine TTL: use task_meta.ttl if provided, else docket default
if task_meta is not None and task_meta.ttl is not None:
ttl_ms = task_meta.ttl
else:
ttl_ms = int(docket.execution_ttl.total_seconds() * 1000)
ttl_seconds = int(ttl_ms / 1000) + TASK_MAPPING_TTL_BUFFER_SECONDS

# Store task metadata in Redis for protocol handlers
redis_key = f"fastmcp:task:{session_id}:{server_task_id}"
created_at_key = f"fastmcp:task:{session_id}:{server_task_id}:created_at"
poll_interval_key = f"fastmcp:task:{session_id}:{server_task_id}:poll_interval"
ttl_seconds = int(
docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS
)
poll_interval_ms = int(component.task_config.poll_interval.total_seconds() * 1000)
async with docket.redis() as redis:
await redis.set(redis_key, task_key, ex=ttl_seconds)
Expand Down Expand Up @@ -140,7 +147,7 @@ async def submit_to_docket(
status="working",
createdAt=created_at,
lastUpdatedAt=created_at,
ttl=int(docket.execution_ttl.total_seconds() * 1000),
ttl=ttl_ms,
pollInterval=poll_interval_ms,
)
)
26 changes: 22 additions & 4 deletions src/fastmcp/server/tasks/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from mcp.types import METHOD_NOT_FOUND, ErrorData

from fastmcp.server.dependencies import get_task_metadata
from fastmcp.server.tasks.config import TaskMeta
from fastmcp.server.tasks.handlers import submit_to_docket

if TYPE_CHECKING:
Expand All @@ -26,16 +27,21 @@
async def check_background_task(
component: Tool | Resource | ResourceTemplate | Prompt,
task_type: TaskType,
key: str,
# TODO: Remove `key` parameter when resources and prompts are updated to use
# explicit task_meta parameter like tools do
key: str | None = None,
arguments: dict[str, Any] | None = None,
task_meta: TaskMeta | None = None,
) -> mcp.types.CreateTaskResult | None:
"""Check task mode and submit to background if requested.

Args:
component: The MCP component
task_type: Type of task ("tool", "resource", "template", "prompt")
key: Docket registration key (caller resolves from contextvar + fallback)
key: Docket registration key (deprecated, use task_meta.fn_key instead)
arguments: Arguments for tool/prompt/template execution
task_meta: Task execution metadata. If provided, execute as background task.
When None, falls back to reading from contextvar for backwards compat.

Returns:
CreateTaskResult if submitted to docket, None for sync execution
Expand All @@ -44,7 +50,16 @@ async def check_background_task(
McpError: If mode="required" but no task metadata, or mode="forbidden"
but task metadata is present
"""
task_meta = get_task_metadata()
# For backwards compatibility: if task_meta not provided, check contextvar
# This is used by resources/prompts which haven't been updated yet
if task_meta is None:
task_meta_dict = get_task_metadata()
if task_meta_dict is not None:
task_meta = TaskMeta(
ttl=task_meta_dict.get("ttl"),
fn_key=key, # Use key parameter for backwards compat
)

task_config = component.task_config

# Infer label from component
Expand Down Expand Up @@ -72,4 +87,7 @@ async def check_background_task(
if not task_meta:
return None

return await submit_to_docket(task_type, key, component, arguments)
# fn_key should be set by caller (FastMCP.call_tool enriches it)
# Fall back to key parameter for backwards compat, then component.key
fn_key = task_meta.fn_key or key or component.key
return await submit_to_docket(task_type, fn_key, component, arguments, task_meta)
Loading
Loading