diff --git a/src/fastmcp/client/client.py b/src/fastmcp/client/client.py index dd19a5b563..28f282db75 100644 --- a/src/fastmcp/client/client.py +++ b/src/fastmcp/client/client.py @@ -30,7 +30,7 @@ PaginatedRequestParams, TaskStatusNotification, ) -from pydantic import AnyUrl +from pydantic import AnyUrl, RootModel import fastmcp from fastmcp.client.elicitation import ElicitationHandler, create_elicitation_callback @@ -907,41 +907,44 @@ async def _read_resource_as_task( ResourceTask: Future-like object for accessing task status and results """ # Per SEP-1686 final spec: client sends only ttl, server generates taskId - # Read resource with task metadata (no taskId sent) - result = await self.read_resource_mcp( - uri=uri, - meta={ - "modelcontextprotocol.io/task": { - "ttl": ttl, - } - }, + if isinstance(uri, str): + uri = AnyUrl(uri) + + request = mcp.types.ReadResourceRequest( + params=mcp.types.ReadResourceRequestParams( + uri=uri, + task=mcp.types.TaskMetadata(ttl=ttl), + ) ) - # Check if server accepted background execution - # If response includes task metadata with taskId, server accepted background mode - # If response includes returned_immediately=True, server declined and executed sync - task_meta = (result.meta or {}).get("modelcontextprotocol.io/task", {}) - if task_meta.get("taskId"): - # Background execution accepted - extract server-generated taskId - server_task_id = task_meta["taskId"] - # Track this task ID for list_tasks() + # Server returns CreateTaskResult (task accepted) or ReadResourceResult (graceful degradation) + TaskResponseUnion = RootModel[ + mcp.types.CreateTaskResult | mcp.types.ReadResourceResult + ] + wrapped_result = await self.session.send_request( + request=request, # type: ignore[arg-type] + result_type=TaskResponseUnion, # type: ignore[arg-type] + ) + raw_result = wrapped_result.root + + if isinstance(raw_result, mcp.types.CreateTaskResult): + # Task was accepted - extract task info from CreateTaskResult + server_task_id = raw_result.task.taskId self._submitted_task_ids.add(server_task_id) - # Create task object task_obj = ResourceTask( self, server_task_id, uri=str(uri), immediate_result=None ) - - # Register for notification routing self._task_registry[server_task_id] = weakref.ref(task_obj) # type: ignore[assignment] - return task_obj else: - # Server declined background execution (graceful degradation) - # Use a synthetic task ID for the immediate result + # Graceful degradation - server returned ReadResourceResult synthetic_task_id = task_id or str(uuid.uuid4()) return ResourceTask( - self, synthetic_task_id, uri=str(uri), immediate_result=result.contents + self, + synthetic_task_id, + uri=str(uri), + immediate_result=raw_result.contents, ) # async def subscribe_resource(self, uri: AnyUrl | str) -> None: @@ -1119,42 +1122,51 @@ async def _get_prompt_as_task( PromptTask: Future-like object for accessing task status and results """ # Per SEP-1686 final spec: client sends only ttl, server generates taskId - # Call prompt with task metadata (no taskId sent) - result = await self.get_prompt_mcp( - name=name, - arguments=arguments or {}, - meta={ - "modelcontextprotocol.io/task": { - "ttl": ttl, - } - }, + # Serialize arguments for MCP protocol + serialized_arguments: dict[str, str] | None = None + if arguments: + serialized_arguments = {} + for key, value in arguments.items(): + if isinstance(value, str): + serialized_arguments[key] = value + else: + serialized_arguments[key] = pydantic_core.to_json(value).decode( + "utf-8" + ) + + request = mcp.types.GetPromptRequest( + params=mcp.types.GetPromptRequestParams( + name=name, + arguments=serialized_arguments, + task=mcp.types.TaskMetadata(ttl=ttl), + ) + ) + + # Server returns CreateTaskResult (task accepted) or GetPromptResult (graceful degradation) + TaskResponseUnion = RootModel[ + mcp.types.CreateTaskResult | mcp.types.GetPromptResult + ] + wrapped_result = await self.session.send_request( + request=request, # type: ignore[arg-type] + result_type=TaskResponseUnion, # type: ignore[arg-type] ) + raw_result = wrapped_result.root - # Check if server accepted background execution - # If response includes task metadata with taskId, server accepted background mode - # If response includes returned_immediately=True, server declined and executed sync - task_meta = (result.meta or {}).get("modelcontextprotocol.io/task", {}) - if task_meta.get("taskId"): - # Background execution accepted - extract server-generated taskId - server_task_id = task_meta["taskId"] - # Track this task ID for list_tasks() + if isinstance(raw_result, mcp.types.CreateTaskResult): + # Task was accepted - extract task info from CreateTaskResult + server_task_id = raw_result.task.taskId self._submitted_task_ids.add(server_task_id) - # Create task object task_obj = PromptTask( self, server_task_id, prompt_name=name, immediate_result=None ) - - # Register for notification routing self._task_registry[server_task_id] = weakref.ref(task_obj) # type: ignore[assignment] - return task_obj else: - # Server declined background execution (graceful degradation) - # Use a synthetic task ID for the immediate result + # Graceful degradation - server returned GetPromptResult synthetic_task_id = task_id or str(uuid.uuid4()) return PromptTask( - self, synthetic_task_id, prompt_name=name, immediate_result=result + self, synthetic_task_id, prompt_name=name, immediate_result=raw_result ) # --- Completion --- @@ -1284,33 +1296,13 @@ async def call_tool_mcp( if isinstance(timeout, int | float): timeout = datetime.timedelta(seconds=float(timeout)) - # For task submissions, use send_request to bypass SDK validation - # Task acknowledgments don't have structured content, which would fail validation - if meta and "modelcontextprotocol.io/task" in meta: - task_dict = meta.get("modelcontextprotocol.io/task") - request = mcp.types.CallToolRequest( - params=mcp.types.CallToolRequestParams( - name=name, - arguments=arguments, - task=mcp.types.TaskMetadata(**task_dict) - if task_dict - else None, # SEP-1686: task as direct param (spec-compliant) - ) - ) - result = await self.session.send_request( - request=request, # type: ignore[arg-type] - result_type=mcp.types.CallToolResult, - request_read_timeout_seconds=timeout, # type: ignore[arg-type] - progress_callback=progress_handler or self._progress_handler, - ) - else: - result = await self.session.call_tool( - name=name, - arguments=arguments, - read_timeout_seconds=timeout, # ty: ignore[invalid-argument-type] - progress_callback=progress_handler or self._progress_handler, - meta=meta, - ) + result = await self.session.call_tool( + name=name, + arguments=arguments, + read_timeout_seconds=timeout, # ty: ignore[invalid-argument-type] + progress_callback=progress_handler or self._progress_handler, + meta=meta, + ) return result async def _parse_call_tool_result( @@ -1470,42 +1462,39 @@ async def _call_tool_as_task( ToolTask: Future-like object for accessing task status and results """ # Per SEP-1686 final spec: client sends only ttl, server generates taskId - # Call tool with task metadata (no taskId sent) - result = await self.call_tool_mcp( - name=name, - arguments=arguments or {}, - meta={ - "modelcontextprotocol.io/task": { - "ttl": ttl, - } - }, + # Build request with task metadata + request = mcp.types.CallToolRequest( + params=mcp.types.CallToolRequestParams( + name=name, + arguments=arguments or {}, + task=mcp.types.TaskMetadata(ttl=ttl), + ) + ) + + # Server returns CreateTaskResult (task accepted) or CallToolResult (graceful degradation) + # Use RootModel with Union to handle both response types (SDK calls model_validate) + TaskResponseUnion = RootModel[ + mcp.types.CreateTaskResult | mcp.types.CallToolResult + ] + wrapped_result = await self.session.send_request( + request=request, # type: ignore[arg-type] + result_type=TaskResponseUnion, # type: ignore[arg-type] ) + raw_result = wrapped_result.root - # Check if server accepted background execution - # If response includes task metadata with taskId, server accepted background mode - # If response includes returned_immediately=True, server declined and executed sync - task_meta = (result.meta or {}).get("modelcontextprotocol.io/task", {}) - if task_meta.get("taskId"): - # Background execution accepted - extract server-generated taskId - server_task_id = task_meta["taskId"] - # Track this task ID for list_tasks() + if isinstance(raw_result, mcp.types.CreateTaskResult): + # Task was accepted - extract task info from CreateTaskResult + server_task_id = raw_result.task.taskId self._submitted_task_ids.add(server_task_id) - # Create task object task_obj = ToolTask( self, server_task_id, tool_name=name, immediate_result=None ) - - # Register for notification routing self._task_registry[server_task_id] = weakref.ref(task_obj) # type: ignore[assignment] - return task_obj else: - # Server declined background execution (graceful degradation) - # or returned_immediately=True - executed synchronously - # Wrap the immediate result - parsed_result = await self._parse_call_tool_result(name, result) - # Use a synthetic task ID for the immediate result + # Graceful degradation - server returned CallToolResult + parsed_result = await self._parse_call_tool_result(name, raw_result) synthetic_task_id = task_id or str(uuid.uuid4()) return ToolTask( self, synthetic_task_id, tool_name=name, immediate_result=parsed_result diff --git a/src/fastmcp/prompts/prompt.py b/src/fastmcp/prompts/prompt.py index 26360f8c11..394a5f7147 100644 --- a/src/fastmcp/prompts/prompt.py +++ b/src/fastmcp/prompts/prompt.py @@ -208,6 +208,13 @@ async def render( """ raise NotImplementedError("Subclasses must implement render()") + def convert_result(self, raw_value: Any) -> PromptResult: + """Convert a raw return value to PromptResult. + + Subclasses should override this to handle their specific conversion logic. + """ + raise NotImplementedError("Subclasses must implement convert_result()") + async def _render( self, arguments: dict[str, Any] | None = None, @@ -432,45 +439,51 @@ async def render( if inspect.isawaitable(result): result = await result - # Validate messages - if not isinstance(result, list | tuple): - result = [result] - - # Convert result to messages - messages: list[PromptMessage] = [] - for msg in result: - try: - if isinstance(msg, PromptMessage): - messages.append(msg) - elif isinstance(msg, str): - messages.append( - PromptMessage( - role="user", - content=TextContent(type="text", text=msg), - ) - ) - else: - content = pydantic_core.to_json(msg, fallback=str).decode() - messages.append( - PromptMessage( - role="user", - content=TextContent(type="text", text=content), - ) - ) - except Exception as e: - raise PromptError( - "Could not convert prompt result to message." - ) from e - - return PromptResult( - messages=messages, - description=self.description, - meta=self.meta, - ) + return self.convert_result(result) except Exception as e: logger.exception(f"Error rendering prompt {self.name}") raise PromptError(f"Error rendering prompt {self.name}.") from e + def convert_result(self, raw_value: Any) -> PromptResult: + """Convert a raw return value to PromptResult. + + This handles the same conversion logic as render(), but works on + already-executed raw values (e.g., from Docket background execution). + """ + # Normalize to list + if not isinstance(raw_value, list | tuple): + raw_value = [raw_value] + + # Convert result to messages + messages: list[PromptMessage] = [] + for msg in raw_value: + try: + if isinstance(msg, PromptMessage): + messages.append(msg) + elif isinstance(msg, str): + messages.append( + PromptMessage( + role="user", + content=TextContent(type="text", text=msg), + ) + ) + else: + content = pydantic_core.to_json(msg, fallback=str).decode() + messages.append( + PromptMessage( + role="user", + content=TextContent(type="text", text=content), + ) + ) + except Exception as e: + raise PromptError("Could not convert prompt result to message.") from e + + return PromptResult( + messages=messages, + description=self.description, + meta=self.meta, + ) + def register_with_docket(self, docket: Docket) -> None: """Register this prompt with docket for background execution. diff --git a/src/fastmcp/resources/resource.py b/src/fastmcp/resources/resource.py index c2b648868c..60e6cd3aeb 100644 --- a/src/fastmcp/resources/resource.py +++ b/src/fastmcp/resources/resource.py @@ -376,8 +376,15 @@ async def read(self) -> str | bytes | ResourceContent: if isinstance(result, Resource): return await result.read() - # Convert any value to ResourceContent - return ResourceContent.from_value(result, mime_type=self.mime_type) + return self.convert_result(result) + + def convert_result(self, raw_value: Any) -> ResourceContent: + """Convert a raw return value to ResourceContent. + + This handles the same conversion logic as read(), but works on + already-executed raw values (e.g., from Docket background execution). + """ + return ResourceContent.from_value(raw_value, mime_type=self.mime_type) def register_with_docket(self, docket: Docket) -> None: """Register this resource with docket for background execution. diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index 47458d2b6a..e42bcab681 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -84,11 +84,6 @@ from fastmcp.server.providers import Provider from fastmcp.server.tasks.capabilities import get_task_capabilities from fastmcp.server.tasks.config import TaskConfig -from fastmcp.server.tasks.handlers import ( - handle_prompt_as_task, - handle_resource_as_task, - handle_tool_as_task, -) from fastmcp.settings import Settings from fastmcp.tools.tool import FunctionTool, Tool, ToolResult from fastmcp.tools.tool_manager import ToolManager @@ -569,175 +564,39 @@ def run( ) def _setup_handlers(self) -> None: - """Set up core MCP protocol handlers.""" + """Set up core MCP protocol handlers. + + We override the SDK's default handlers for tools/call, resources/read, + and prompts/get to add task-augmented execution support (SEP-1686). + + The SDK's decorators have different capabilities: + - call_tool: Supports CreateTaskResult returns AND validate_input + - read_resource: Does NOT support CreateTaskResult + - get_prompt: Does NOT support CreateTaskResult + + So we use the SDK decorator for tools (to get input validation), but + register custom handlers for resources and prompts. + """ self._mcp_server.list_tools()(self._list_tools_mcp) self._mcp_server.list_resources()(self._list_resources_mcp) self._mcp_server.list_resource_templates()(self._list_resource_templates_mcp) self._mcp_server.list_prompts()(self._list_prompts_mcp) + + # Tools: SDK decorator provides validate_input + CreateTaskResult support self._mcp_server.call_tool(validate_input=self.strict_input_validation)( self._call_tool_mcp ) - # Register custom read_resource handler (SDK decorator doesn't support CreateTaskResult) - self._setup_read_resource_handler() - # Register custom get_prompt handler (SDK decorator doesn't support CreateTaskResult) - self._setup_get_prompt_handler() - # Register custom SEP-1686 task protocol handlers - self._setup_task_protocol_handlers() - - def _setup_read_resource_handler(self) -> None: - """ - Set up custom read_resource handler that supports task-augmented responses. - - The SDK's read_resource decorator doesn't support CreateTaskResult returns, - so we register a custom handler that checks request_context.experimental.is_task. - """ - - async def handler(req: mcp.types.ReadResourceRequest) -> mcp.types.ServerResult: - uri = req.params.uri - - # Check for task metadata via SDK's request context - task_meta = None - try: - ctx = self._mcp_server.request_context - if ctx.experimental.is_task: - task_meta = ctx.experimental.task_metadata - except (AttributeError, LookupError): - pass - - # Check for task metadata and route appropriately - async with fastmcp.server.context.Context(fastmcp=self): - # Get resource including from mounted servers - resource = await self._get_resource_or_template_or_none(str(uri)) - if ( - resource - and self._should_enable_component(resource) - and hasattr(resource, "task_config") - ): - task_mode = resource.task_config.mode # type: ignore[union-attr] - - # Enforce mode="required" - must have task metadata - if task_mode == "required" and not task_meta: - raise McpError( - ErrorData( - code=METHOD_NOT_FOUND, - message=f"Resource '{uri}' requires task-augmented execution", - ) - ) - - # Route to background if task metadata present and mode allows - if task_meta and task_mode != "forbidden": - # Resource has task support, use Docket for background execution - task_meta_dict = task_meta.model_dump(exclude_none=True) - return await handle_resource_as_task( - self, str(uri), resource, task_meta_dict - ) - - # Forbidden mode: task requested but mode="forbidden" - # Raise error since resources don't have isError field - if task_meta and task_mode == "forbidden": - raise McpError( - ErrorData( - code=METHOD_NOT_FOUND, - message=f"Resource '{uri}' does not support task-augmented execution", - ) - ) - - # Synchronous execution - result = await self._read_resource_mcp(uri) - - # Graceful degradation: if we got here with task_meta, something went wrong - # (This should be unreachable now that forbidden raises) - if task_meta: - mcp_contents = [item.to_mcp_resource_contents(uri) for item in result] - return mcp.types.ServerResult( - mcp.types.ReadResourceResult( - contents=mcp_contents, - _meta={ - "modelcontextprotocol.io/task": { - "returned_immediately": True - } - }, - ) - ) - - # Convert to proper ServerResult - if isinstance(result, mcp.types.ServerResult): - return result - - mcp_contents = [item.to_mcp_resource_contents(uri) for item in result] - return mcp.types.ServerResult( - mcp.types.ReadResourceResult(contents=mcp_contents) - ) - - self._mcp_server.request_handlers[mcp.types.ReadResourceRequest] = handler - def _setup_get_prompt_handler(self) -> None: - """ - Set up custom get_prompt handler that supports task-augmented responses. - - The SDK's get_prompt decorator doesn't support CreateTaskResult returns, - so we register a custom handler that checks request_context.experimental.is_task. - """ - - async def handler(req: mcp.types.GetPromptRequest) -> mcp.types.ServerResult: - name = req.params.name - arguments = req.params.arguments - - # Check for task metadata via SDK's request context - task_meta = None - try: - ctx = self._mcp_server.request_context - if ctx.experimental.is_task: - task_meta = ctx.experimental.task_metadata - except (AttributeError, LookupError): - pass - - # Check for task metadata and route appropriately - async with fastmcp.server.context.Context(fastmcp=self): - try: - prompt = await self.get_prompt(name) - except NotFoundError: - prompt = None - if ( - prompt - and self._should_enable_component(prompt) - and hasattr(prompt, "task_config") - and prompt.task_config - ): - task_mode = prompt.task_config.mode # type: ignore[union-attr] - - # Enforce mode="required" - must have task metadata - if task_mode == "required" and not task_meta: - raise McpError( - ErrorData( - code=METHOD_NOT_FOUND, - message=f"Prompt '{name}' requires task-augmented execution", - ) - ) - - # Route to background if task metadata present and mode allows - if task_meta and task_mode != "forbidden": - task_meta_dict = task_meta.model_dump(exclude_none=True) - result = await handle_prompt_as_task( - self, name, arguments, task_meta_dict - ) - return mcp.types.ServerResult(result) - - # Forbidden mode: task requested but mode="forbidden" - # Raise error since prompts don't have isError field - if task_meta and task_mode == "forbidden": - raise McpError( - ErrorData( - code=METHOD_NOT_FOUND, - message=f"Prompt '{name}' does not support task-augmented execution", - ) - ) - - # Synchronous execution - result = await self._get_prompt_mcp(name, arguments) - return mcp.types.ServerResult(result) + # Resources/Prompts: Custom handlers (SDK decorators don't support CreateTaskResult) + self._mcp_server.request_handlers[mcp.types.ReadResourceRequest] = ( + self._read_resource_handler + ) + self._mcp_server.request_handlers[mcp.types.GetPromptRequest] = ( + self._get_prompt_handler + ) - self._mcp_server.request_handlers[mcp.types.GetPromptRequest] = handler + # Register SEP-1686 task protocol handlers + self._setup_task_protocol_handlers() def _setup_task_protocol_handlers(self) -> None: """Register SEP-1686 task protocol handlers with SDK.""" @@ -1402,6 +1261,7 @@ async def _call_tool_mcp( list[ContentBlock] | tuple[list[ContentBlock], dict[str, Any]] | mcp.types.CallToolResult + | mcp.types.CreateTaskResult ): """ Handle MCP 'callTool' requests. @@ -1415,6 +1275,8 @@ async def _call_tool_mcp( Returns: List of MCP Content objects containing the tool results """ + from fastmcp.server.tasks.handlers import handle_tool_as_task + logger.debug( f"[{self.name}] Handler called: call_tool %s with %s", key, arguments ) @@ -1453,7 +1315,6 @@ async def _call_tool_mcp( # Route to background if task metadata present and mode allows if task_meta and task_mode != "forbidden": # Tool has task support, use Docket for background execution - # (ProxyTool has mode="forbidden" and will not enter this branch) task_meta_dict = task_meta.model_dump(exclude_none=True) return await handle_tool_as_task( self, key, arguments, task_meta_dict @@ -1485,6 +1346,135 @@ async def _call_tool_mcp( except NotFoundError as e: raise NotFoundError(f"Unknown tool: {key}") from e + async def _read_resource_handler( + self, req: mcp.types.ReadResourceRequest + ) -> mcp.types.ServerResult: + """Handle resources/read requests with task-augmented execution support. + + This is a custom handler because the SDK's read_resource decorator + does not support returning CreateTaskResult for background tasks. + """ + from fastmcp.server.tasks.handlers import handle_resource_as_task + + uri = req.params.uri + + # Check for task metadata via SDK's request context + task_meta = None + try: + ctx = self._mcp_server.request_context + if ctx.experimental.is_task: + task_meta = ctx.experimental.task_metadata + except (AttributeError, LookupError): + pass + + async with fastmcp.server.context.Context(fastmcp=self): + # Get resource including from mounted servers + resource = await self._get_resource_or_template_or_none(str(uri)) + if ( + resource + and self._should_enable_component(resource) + and hasattr(resource, "task_config") + ): + task_mode = resource.task_config.mode # type: ignore[union-attr] + + # Enforce mode="required" - must have task metadata + if task_mode == "required" and not task_meta: + raise McpError( + ErrorData( + code=METHOD_NOT_FOUND, + message=f"Resource '{uri}' requires task-augmented execution", + ) + ) + + # Route to background if task metadata present and mode allows + if task_meta and task_mode != "forbidden": + task_meta_dict = task_meta.model_dump(exclude_none=True) + result = await handle_resource_as_task( + self, str(uri), resource, task_meta_dict + ) + return mcp.types.ServerResult(result) + + # Forbidden mode: task requested but mode="forbidden" + if task_meta and task_mode == "forbidden": + raise McpError( + ErrorData( + code=METHOD_NOT_FOUND, + message=f"Resource '{uri}' does not support task-augmented execution", + ) + ) + + # Synchronous execution + contents = await self._read_resource_mcp(uri) + mcp_contents = [item.to_mcp_resource_contents(uri) for item in contents] + return mcp.types.ServerResult( + mcp.types.ReadResourceResult(contents=mcp_contents) + ) + + async def _get_prompt_handler( + self, req: mcp.types.GetPromptRequest + ) -> mcp.types.ServerResult: + """Handle prompts/get requests with task-augmented execution support. + + This is a custom handler because the SDK's get_prompt decorator + does not support returning CreateTaskResult for background tasks. + """ + from fastmcp.server.tasks.handlers import handle_prompt_as_task + + name = req.params.name + arguments = req.params.arguments + + # Check for task metadata via SDK's request context + task_meta = None + try: + ctx = self._mcp_server.request_context + if ctx.experimental.is_task: + task_meta = ctx.experimental.task_metadata + except (AttributeError, LookupError): + pass + + async with fastmcp.server.context.Context(fastmcp=self): + try: + prompt = await self.get_prompt(name) + except NotFoundError: + prompt = None + if ( + prompt + and self._should_enable_component(prompt) + and hasattr(prompt, "task_config") + and prompt.task_config + ): + task_mode = prompt.task_config.mode # type: ignore[union-attr] + + # Enforce mode="required" - must have task metadata + if task_mode == "required" and not task_meta: + raise McpError( + ErrorData( + code=METHOD_NOT_FOUND, + message=f"Prompt '{name}' requires task-augmented execution", + ) + ) + + # Route to background if task metadata present and mode allows + if task_meta and task_mode != "forbidden": + task_meta_dict = task_meta.model_dump(exclude_none=True) + result = await handle_prompt_as_task( + self, name, arguments, task_meta_dict + ) + return mcp.types.ServerResult(result) + + # Forbidden mode: task requested but mode="forbidden" + if task_meta and task_mode == "forbidden": + raise McpError( + ErrorData( + code=METHOD_NOT_FOUND, + message=f"Prompt '{name}' does not support task-augmented execution", + ) + ) + + # Synchronous execution + result = await self._get_prompt_mcp(name, arguments) + return mcp.types.ServerResult(result) + async def _call_tool_middleware( self, key: str, diff --git a/src/fastmcp/server/tasks/converters.py b/src/fastmcp/server/tasks/converters.py deleted file mode 100644 index 42ea1abed9..0000000000 --- a/src/fastmcp/server/tasks/converters.py +++ /dev/null @@ -1,204 +0,0 @@ -"""SEP-1686 task result converters. - -Converts raw task return values to MCP result types. -""" - -from __future__ import annotations - -import base64 -from typing import TYPE_CHECKING, Any - -import mcp.types -import pydantic_core - -from fastmcp.resources.resource import ResourceContent - -if TYPE_CHECKING: - from fastmcp.server.server import FastMCP - - -async def convert_tool_result( - server: FastMCP, raw_value: Any, tool_name: str, client_task_id: str -) -> mcp.types.CallToolResult: - """Convert raw tool return value to MCP CallToolResult. - - Replicates the serialization logic from tool.run() to properly handle - output_schema, structured content, etc. - - Args: - server: FastMCP server instance - raw_value: The raw return value from user's tool function - tool_name: Name of the tool (to get output_schema and serializer) - client_task_id: Client task ID for related-task metadata - - Returns: - CallToolResult with properly formatted content and structured content - """ - # Import here to avoid circular import: - # tools/tool.py -> tasks/config.py -> tasks/__init__.py -> converters.py -> tools/tool.py - from fastmcp.tools.tool import ToolResult, _convert_to_content - - # Get the tool to access its configuration - tool = await server.get_tool(tool_name) - - # Build related-task metadata - related_task_meta = { - "modelcontextprotocol.io/related-task": { - "taskId": client_task_id, - } - } - - # If raw value is already ToolResult, use it directly - if isinstance(raw_value, ToolResult): - mcp_result = raw_value.to_mcp_result() - if isinstance(mcp_result, mcp.types.CallToolResult): - # Add metadata - mcp_result._meta = related_task_meta - return mcp_result - elif isinstance(mcp_result, tuple): - content, structured_content = mcp_result - return mcp.types.CallToolResult( - content=content, - structuredContent=structured_content, - _meta=related_task_meta, - ) - else: - return mcp.types.CallToolResult(content=mcp_result, _meta=related_task_meta) - - # Convert raw value to content blocks - unstructured_result = _convert_to_content(raw_value, serializer=tool.serializer) - - # Handle structured content creation (same logic as tool.run()) - structured_content = None - - if tool.output_schema is None: - # Try to serialize as dict for structured content - try: - sc = pydantic_core.to_jsonable_python(raw_value) - if isinstance(sc, dict): - structured_content = sc - except pydantic_core.PydanticSerializationError: - pass - else: - # Has output_schema - convert to JSON-able types - jsonable_value = pydantic_core.to_jsonable_python(raw_value) - wrap_result = tool.output_schema.get("x-fastmcp-wrap-result") - structured_content = ( - {"result": jsonable_value} if wrap_result else jsonable_value - ) - - return mcp.types.CallToolResult( - content=unstructured_result, - structuredContent=structured_content, - _meta=related_task_meta, - ) - - -async def convert_prompt_result( - server: FastMCP, raw_value: Any, prompt_name: str, client_task_id: str -) -> mcp.types.GetPromptResult: - """Convert raw prompt return value to MCP GetPromptResult. - - The user function returns raw values (strings, dicts, lists) that need - to be converted to PromptMessage objects. - - Args: - server: FastMCP server instance - raw_value: The raw return value from user's prompt function - prompt_name: Name of the prompt - client_task_id: Client task ID for related-task metadata - - Returns: - GetPromptResult with properly formatted messages - """ - from fastmcp.prompts.prompt import PromptMessage - - # Get the prompt for metadata - prompt = await server.get_prompt(prompt_name) - - # Normalize to list - if not isinstance(raw_value, list | tuple): - raw_value = [raw_value] - - # Convert to PromptMessages - messages: list[mcp.types.PromptMessage] = [] - for msg in raw_value: - if isinstance(msg, PromptMessage): - # PromptMessage is imported from mcp.types - use directly - messages.append(msg) - elif isinstance(msg, str): - messages.append( - mcp.types.PromptMessage( - role="user", - content=mcp.types.TextContent(type="text", text=msg), - ) - ) - elif isinstance(msg, dict): - messages.append(mcp.types.PromptMessage.model_validate(msg)) - else: - raise ValueError(f"Invalid message type: {type(msg)}") - - return mcp.types.GetPromptResult( - description=prompt.description or "", - messages=messages, - _meta={ - "modelcontextprotocol.io/related-task": { - "taskId": client_task_id, - } - }, - ) - - -async def convert_resource_result( - server: FastMCP, - raw_value: str | bytes | ResourceContent, - uri: str, - client_task_id: str, -) -> dict[str, Any]: - """Convert resource result to MCP resource contents dict. - - Args: - server: FastMCP server instance - raw_value: Result from the resource function (str, bytes, or ResourceContent) - uri: Resource URI (for the contents response) - client_task_id: Client task ID for related-task metadata - - Returns: - Dict with 'contents' key containing list of resource contents - """ - # Build related-task metadata - related_task_meta = { - "modelcontextprotocol.io/related-task": { - "taskId": client_task_id, - } - } - - # Convert to ResourceContent if needed (handles str, bytes) - if not isinstance(raw_value, ResourceContent): - raw_value = ResourceContent.from_value(raw_value) - - # Extract content from ResourceContent - content = raw_value.content - mime_type = raw_value.mime_type - content_meta = raw_value.meta - - if isinstance(content, str): - content_dict: dict[str, Any] = { - "uri": uri, - "text": content, - "mimeType": mime_type or "text/plain", - } - else: - content_dict = { - "uri": uri, - "blob": base64.b64encode(content).decode(), - "mimeType": mime_type or "application/octet-stream", - } - - if content_meta: - content_dict["_meta"] = content_meta - - return { - "contents": [content_dict], - "_meta": related_task_meta, - } diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index fc3127f090..dd33d13744 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -18,6 +18,8 @@ from fastmcp.server.tasks.keys import build_task_key if TYPE_CHECKING: + from fastmcp.resources.resource import Resource + from fastmcp.resources.template import ResourceTemplate from fastmcp.server.server import FastMCP # Redis mapping TTL buffer: Add 15 minutes to Docket's execution_ttl @@ -28,29 +30,32 @@ async def handle_tool_as_task( server: FastMCP, tool_name: str, arguments: dict[str, Any], - task_meta: dict[str, Any], -) -> mcp.types.CallToolResult: + _task_meta: dict[str, Any], +) -> mcp.types.CreateTaskResult: """Handle tool execution as background task (SEP-1686). Queues the user's actual function to Docket (preserving signature for DI), stores raw return values, converts to MCP types on retrieval. + Note: Client-requested TTL in task_meta is intentionally ignored. + Server-side TTL policy (docket.execution_ttl) takes precedence for + consistent task lifecycle management. + Args: server: FastMCP server instance tool_name: Name of the tool to execute arguments: Tool arguments - task_meta: Task metadata from request (contains ttl) + _task_meta: Task metadata from request (unused - server TTL policy applies) Returns: - CallToolResult: Task stub with task metadata in _meta + CreateTaskResult: Task stub with proper Task object """ # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) # Record creation timestamp per SEP-1686 final spec (line 430) - # Format as ISO 8601 / RFC 3339 timestamp - created_at = datetime.now(timezone.utc).isoformat() + created_at = datetime.now(timezone.utc) # Get session ID and Docket ctx = get_context() @@ -79,7 +84,7 @@ async def handle_tool_as_task( ) async with docket.redis() as redis: await redis.set(redis_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) + await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds) # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -118,16 +123,17 @@ async def handle_tool_as_task( docket, ) - # Return task stub + # Return CreateTaskResult with proper Task object # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) - return mcp.types.CallToolResult( - content=[], - _meta={ - "modelcontextprotocol.io/task": { - "taskId": server_task_id, - "status": "working", - } - }, + return mcp.types.CreateTaskResult( + task=mcp.types.Task( + taskId=server_task_id, + status="working", + createdAt=created_at, + lastUpdatedAt=created_at, + ttl=int(docket.execution_ttl.total_seconds() * 1000), + pollInterval=1000, + ) ) @@ -135,28 +141,30 @@ async def handle_prompt_as_task( server: FastMCP, prompt_name: str, arguments: dict[str, Any] | None, - task_meta: dict[str, Any], -) -> mcp.types.GetPromptResult: + _task_meta: dict[str, Any], +) -> mcp.types.CreateTaskResult: """Handle prompt execution as background task (SEP-1686). Queues the user's actual function to Docket (preserving signature for DI). + Note: Client-requested TTL in task_meta is intentionally ignored. + Server-side TTL policy (docket.execution_ttl) takes precedence. + Args: server: FastMCP server instance prompt_name: Name of the prompt to execute arguments: Prompt arguments - task_meta: Task metadata from request (contains ttl) + _task_meta: Task metadata from request (unused - server TTL policy applies) Returns: - GetPromptResult: Task stub with task metadata in _meta + CreateTaskResult: Task stub with proper Task object """ # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) # Record creation timestamp per SEP-1686 final spec (line 430) - # Format as ISO 8601 / RFC 3339 timestamp - created_at = datetime.now(timezone.utc).isoformat() + created_at = datetime.now(timezone.utc) # Get session ID and Docket ctx = get_context() @@ -185,7 +193,7 @@ async def handle_prompt_as_task( ) async with docket.redis() as redis: await redis.set(redis_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) + await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds) # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -221,46 +229,48 @@ async def handle_prompt_as_task( docket, ) - # Return task stub + # Return CreateTaskResult with proper Task object # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) - return mcp.types.GetPromptResult( - description="", - messages=[], - _meta={ - "modelcontextprotocol.io/task": { - "taskId": server_task_id, - "status": "working", - } - }, + return mcp.types.CreateTaskResult( + task=mcp.types.Task( + taskId=server_task_id, + status="working", + createdAt=created_at, + lastUpdatedAt=created_at, + ttl=int(docket.execution_ttl.total_seconds() * 1000), + pollInterval=1000, + ) ) async def handle_resource_as_task( - server: FastMCP, + _server: FastMCP, uri: str, - resource, # Resource or ResourceTemplate - task_meta: dict[str, Any], -) -> mcp.types.ServerResult: + resource: Resource | ResourceTemplate, + _task_meta: dict[str, Any], +) -> mcp.types.CreateTaskResult: """Handle resource read as background task (SEP-1686). Queues the user's actual function to Docket. + Note: Client-requested TTL in task_meta is intentionally ignored. + Server-side TTL policy (docket.execution_ttl) takes precedence. + Args: - server: FastMCP server instance + _server: FastMCP server instance (unused - kept for signature consistency) uri: Resource URI resource: Resource or ResourceTemplate object - task_meta: Task metadata from request (contains ttl) + _task_meta: Task metadata from request (unused - server TTL policy applies) Returns: - ServerResult with ReadResourceResult stub + CreateTaskResult: Task stub with proper Task object """ # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) # Record creation timestamp per SEP-1686 final spec (line 430) - # Format as ISO 8601 / RFC 3339 timestamp - created_at = datetime.now(timezone.utc).isoformat() + created_at = datetime.now(timezone.utc) # Get session ID and Docket ctx = get_context() @@ -286,7 +296,7 @@ async def handle_resource_as_task( ) async with docket.redis() as redis: await redis.set(redis_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) + await redis.set(created_at_key, created_at.isoformat(), ex=ttl_seconds) # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -328,16 +338,15 @@ async def handle_resource_as_task( docket, ) - # Return task stub + # Return CreateTaskResult with proper Task object # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) - return mcp.types.ServerResult( - mcp.types.ReadResourceResult( - contents=[], - _meta={ - "modelcontextprotocol.io/task": { - "taskId": server_task_id, - "status": "working", - } - }, + return mcp.types.CreateTaskResult( + task=mcp.types.Task( + taskId=server_task_id, + status="working", + createdAt=created_at, + lastUpdatedAt=created_at, + ttl=int(docket.execution_ttl.total_seconds() * 1000), + pollInterval=1000, ) ) diff --git a/src/fastmcp/server/tasks/protocol.py b/src/fastmcp/server/tasks/protocol.py index 902947862b..bf02a43d6d 100644 --- a/src/fastmcp/server/tasks/protocol.py +++ b/src/fastmcp/server/tasks/protocol.py @@ -20,11 +20,6 @@ ListTasksResult, ) -from fastmcp.server.tasks.converters import ( - convert_prompt_result, - convert_resource_result, - convert_tool_result, -) from fastmcp.server.tasks.keys import parse_task_key if TYPE_CHECKING: @@ -230,20 +225,58 @@ async def tasks_result_handler(server: FastMCP, params: dict[str, Any]) -> Any: # Parse task key to get type and component info key_parts = parse_task_key(task_key) task_type = key_parts["task_type"] + component_id = key_parts["component_identifier"] + + # Build related-task metadata + related_task_meta = { + "modelcontextprotocol.io/related-task": { + "taskId": client_task_id, + } + } - # Convert based on task type (pass client_task_id for metadata) + # Convert based on task type using component.convert_result() + to_mcp_result() if task_type == "tool": - return await convert_tool_result( - server, raw_value, key_parts["component_identifier"], client_task_id - ) + tool = await server.get_tool(component_id) + fastmcp_result = tool.convert_result(raw_value) + mcp_result = fastmcp_result.to_mcp_result() + # Ensure we have a CallToolResult and add metadata + if isinstance(mcp_result, mcp.types.CallToolResult): + mcp_result._meta = related_task_meta # type: ignore[attr-defined] + elif isinstance(mcp_result, tuple): + content, structured_content = mcp_result + mcp_result = mcp.types.CallToolResult( + content=content, + structuredContent=structured_content, + _meta=related_task_meta, + ) + else: + mcp_result = mcp.types.CallToolResult( + content=mcp_result, _meta=related_task_meta + ) + return mcp_result + elif task_type == "prompt": - return await convert_prompt_result( - server, raw_value, key_parts["component_identifier"], client_task_id - ) + prompt = await server.get_prompt(component_id) + fastmcp_result = prompt.convert_result(raw_value) + mcp_result = fastmcp_result.to_mcp_prompt_result() + mcp_result._meta = related_task_meta # type: ignore[attr-defined] + return mcp_result + elif task_type == "resource": - return await convert_resource_result( - server, raw_value, key_parts["component_identifier"], client_task_id + # Convert raw value to ResourceContent (handles str, bytes, ResourceContent) + from fastmcp.resources.resource import ResourceContent + + if isinstance(raw_value, ResourceContent): + resource_content = raw_value + else: + resource_content = ResourceContent.from_value(raw_value) + + mcp_content = resource_content.to_mcp_resource_contents(component_id) + return mcp.types.ReadResourceResult( + contents=[mcp_content], + _meta=related_task_meta, ) + else: raise McpError( ErrorData( diff --git a/src/fastmcp/tools/tool.py b/src/fastmcp/tools/tool.py index 2989ceffc1..4adafe0ae6 100644 --- a/src/fastmcp/tools/tool.py +++ b/src/fastmcp/tools/tool.py @@ -240,6 +240,13 @@ async def run(self, arguments: dict[str, Any]) -> ToolResult: """ raise NotImplementedError("Subclasses must implement run()") + def convert_result(self, raw_value: Any) -> ToolResult: + """Convert a raw return value to ToolResult. + + Subclasses should override this to handle their specific conversion logic. + """ + raise NotImplementedError("Subclasses must implement convert_result()") + def register_with_docket(self, docket: Docket) -> None: """Register this tool with docket for background execution.""" if self.task_config.mode == "forbidden": @@ -393,22 +400,30 @@ async def run(self, arguments: dict[str, Any]) -> ToolResult: if inspect.isawaitable(result): result = await result - if isinstance(result, ToolResult): - return result + return self.convert_result(result) + + def convert_result(self, raw_value: Any) -> ToolResult: + """Convert a raw return value to ToolResult. + + This handles the same conversion logic as run(), but works on + already-executed raw values (e.g., from Docket background execution). + """ + if isinstance(raw_value, ToolResult): + return raw_value - unstructured_result = _convert_to_content(result, serializer=self.serializer) + unstructured_result = _convert_to_content(raw_value, serializer=self.serializer) if self.output_schema is None: # Do not produce a structured output for MCP Content Types - if isinstance(result, ContentBlock | Audio | Image | File) or ( - isinstance(result, list | tuple) - and any(isinstance(item, ContentBlock) for item in result) + if isinstance(raw_value, ContentBlock | Audio | Image | File) or ( + isinstance(raw_value, list | tuple) + and any(isinstance(item, ContentBlock) for item in raw_value) ): return ToolResult(content=unstructured_result) # Otherwise, try to serialize the result as a dict try: - structured_content = pydantic_core.to_jsonable_python(result) + structured_content = pydantic_core.to_jsonable_python(raw_value) if isinstance(structured_content, dict): return ToolResult( content=unstructured_result, @@ -424,7 +439,7 @@ async def run(self, arguments: dict[str, Any]) -> ToolResult: return ToolResult( content=unstructured_result, - structured_content={"result": result} if wrap_result else result, + structured_content={"result": raw_value} if wrap_result else raw_value, ) def register_with_docket(self, docket: Docket) -> None: diff --git a/tests/server/tasks/test_task_config_modes.py b/tests/server/tasks/test_task_config_modes.py index c008799968..d2e7510b5d 100644 --- a/tests/server/tasks/test_task_config_modes.py +++ b/tests/server/tasks/test_task_config_modes.py @@ -7,6 +7,7 @@ """ import pytest +from mcp.shared.exceptions import McpError from fastmcp import FastMCP from fastmcp.client import Client @@ -104,9 +105,8 @@ async def optional_tool() -> str: return mcp async def test_required_mode_without_task_returns_error(self, server): - """Required mode returns error when called without task metadata.""" + """Required mode raises error when called without task metadata.""" async with Client(server) as client: - # The server returns isError=True, which the client converts to ToolError with pytest.raises(ToolError) as exc_info: await client.call_tool("required_tool", {}) @@ -180,7 +180,6 @@ async def optional_resource() -> str: async def test_required_resource_without_task_returns_error(self, server): """Required mode returns error when read without task metadata.""" - from mcp.shared.exceptions import McpError from mcp.types import METHOD_NOT_FOUND async with Client(server) as client: @@ -233,7 +232,6 @@ async def optional_prompt() -> str: async def test_required_prompt_without_task_returns_error(self, server): """Required mode returns error when called without task metadata.""" - from mcp.shared.exceptions import McpError from mcp.types import METHOD_NOT_FOUND async with Client(server) as client: