diff --git a/src/fastmcp/prompts/prompt.py b/src/fastmcp/prompts/prompt.py index 76fb7e624c..26360f8c11 100644 --- a/src/fastmcp/prompts/prompt.py +++ b/src/fastmcp/prompts/prompt.py @@ -6,9 +6,13 @@ import json import warnings from collections.abc import Awaitable, Callable, Sequence -from typing import Annotated, Any +from typing import TYPE_CHECKING, Any import pydantic_core + +if TYPE_CHECKING: + from docket import Docket + from docket.execution import Execution from mcp import GetPromptResult from mcp.types import ContentBlock, Icon, PromptMessage, Role, TextContent from mcp.types import Prompt as SDKPrompt @@ -229,15 +233,23 @@ async def _render( result, description=self.description, meta=self.meta ) + def register_with_docket(self, docket: Docket) -> None: + """Register this prompt with docket for background execution.""" + if self.task_config.mode == "forbidden": + return + docket.register(self.render, names=[self.key]) + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, arguments: dict[str, Any] | None, **kwargs: Any + ) -> Execution: + """Schedule this prompt for background execution via docket.""" + return await docket.add(self.key, **kwargs)(arguments) + class FunctionPrompt(Prompt): """A prompt that is a function.""" fn: Callable[..., _PromptFnReturn | Awaitable[_PromptFnReturn]] - task_config: Annotated[ - TaskConfig, - Field(description="Background task execution configuration (SEP-1686)."), - ] = Field(default_factory=lambda: TaskConfig(mode="forbidden")) @classmethod def from_function( @@ -458,3 +470,22 @@ async def render( except Exception as e: logger.exception(f"Error rendering prompt {self.name}") raise PromptError(f"Error rendering prompt {self.name}.") from e + + def register_with_docket(self, docket: Docket) -> None: + """Register this prompt with docket for background execution. + + FunctionPrompt registers the underlying function, which has the user's + Depends parameters for docket to resolve. + """ + if self.task_config.mode == "forbidden": + return + docket.register(self.fn, names=[self.key]) # type: ignore[arg-type] + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, arguments: dict[str, Any] | None, **kwargs: Any + ) -> Execution: + """Schedule this prompt for background execution via docket. + + FunctionPrompt splats the arguments dict since .fn expects **kwargs. + """ + return await docket.add(self.key, **kwargs)(**(arguments or {})) diff --git a/src/fastmcp/resources/resource.py b/src/fastmcp/resources/resource.py index 21d03b8d98..c2b648868c 100644 --- a/src/fastmcp/resources/resource.py +++ b/src/fastmcp/resources/resource.py @@ -6,9 +6,13 @@ import inspect import warnings from collections.abc import Callable -from typing import Annotated, Any +from typing import TYPE_CHECKING, Annotated, Any import mcp.types + +if TYPE_CHECKING: + from docket import Docket + from docket.execution import Execution import pydantic import pydantic_core from mcp.types import Annotations, Icon @@ -277,6 +281,18 @@ def key(self) -> str: """The lookup key for this resource. Returns str(uri).""" return str(self.uri) + def register_with_docket(self, docket: Docket) -> None: + """Register this resource with docket for background execution.""" + if self.task_config.mode == "forbidden": + return + docket.register(self.read, names=[self.key]) + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, **kwargs: Any + ) -> Execution: + """Schedule this resource for background execution via docket.""" + return await docket.add(self.key, **kwargs)() + class FunctionResource(Resource): """A resource that defers data loading by wrapping a function. @@ -292,10 +308,6 @@ class FunctionResource(Resource): """ fn: Callable[..., Any] - task_config: Annotated[ - TaskConfig, - Field(description="Background task execution configuration (SEP-1686)."), - ] = Field(default_factory=lambda: TaskConfig(mode="forbidden")) @classmethod def from_function( @@ -366,3 +378,13 @@ async def read(self) -> str | bytes | ResourceContent: # Convert any value to ResourceContent return ResourceContent.from_value(result, mime_type=self.mime_type) + + def register_with_docket(self, docket: Docket) -> None: + """Register this resource with docket for background execution. + + FunctionResource registers the underlying function, which has the user's + Depends parameters for docket to resolve. + """ + if self.task_config.mode == "forbidden": + return + docket.register(self.fn, names=[self.key]) diff --git a/src/fastmcp/resources/template.py b/src/fastmcp/resources/template.py index ecf9824f0b..19a8722d9a 100644 --- a/src/fastmcp/resources/template.py +++ b/src/fastmcp/resources/template.py @@ -5,10 +5,14 @@ import inspect import re from collections.abc import Callable -from typing import Annotated, Any +from typing import TYPE_CHECKING, Any from urllib.parse import parse_qs, unquote from mcp.types import Annotations, Icon + +if TYPE_CHECKING: + from docket import Docket + from docket.execution import Execution from mcp.types import ResourceTemplate as SDKResourceTemplate from pydantic import ( Field, @@ -222,15 +226,23 @@ def key(self) -> str: """The lookup key for this template. Returns uri_template.""" return self.uri_template + def register_with_docket(self, docket: Docket) -> None: + """Register this template with docket for background execution.""" + if self.task_config.mode == "forbidden": + return + docket.register(self.read, names=[self.key]) + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, params: dict[str, Any], **kwargs: Any + ) -> Execution: + """Schedule this template for background execution via docket.""" + return await docket.add(self.key, **kwargs)(params) + class FunctionResourceTemplate(ResourceTemplate): """A template for dynamically creating resources.""" fn: Callable[..., Any] - task_config: Annotated[ - TaskConfig, - Field(description="Background task execution configuration (SEP-1686)."), - ] = Field(default_factory=lambda: TaskConfig(mode="forbidden")) async def create_resource(self, uri: str, params: dict[str, Any]) -> Resource: """Create a resource from the template with the given parameters.""" @@ -282,6 +294,25 @@ async def read(self, arguments: dict[str, Any]) -> str | bytes: return result + def register_with_docket(self, docket: Docket) -> None: + """Register this template with docket for background execution. + + FunctionResourceTemplate registers the underlying function, which has the + user's Depends parameters for docket to resolve. + """ + if self.task_config.mode == "forbidden": + return + docket.register(self.fn, names=[self.key]) + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, params: dict[str, Any], **kwargs: Any + ) -> Execution: + """Schedule this template for background execution via docket. + + FunctionResourceTemplate splats the params dict since .fn expects **kwargs. + """ + return await docket.add(self.key, **kwargs)(**params) + @classmethod def from_function( cls, diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index 8a4959f59d..47458d2b6a 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -69,9 +69,9 @@ from fastmcp.prompts import Prompt from fastmcp.prompts.prompt import FunctionPrompt, PromptResult from fastmcp.prompts.prompt_manager import PromptManager -from fastmcp.resources.resource import FunctionResource, Resource, ResourceContent +from fastmcp.resources.resource import Resource, ResourceContent from fastmcp.resources.resource_manager import ResourceManager -from fastmcp.resources.template import FunctionResourceTemplate, ResourceTemplate +from fastmcp.resources.template import ResourceTemplate from fastmcp.server.auth import AuthProvider from fastmcp.server.event_store import EventStore from fastmcp.server.http import ( @@ -416,56 +416,32 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: # Store on server instance for cross-task access (FastMCPTransport) self._docket = docket - # Register local task-enabled tools/prompts/resources with Docket - # Only function-based variants support background tasks - # Register components where task execution is not "forbidden" + # Register local task-enabled components with Docket + # Each component checks task_config internally and no-ops if forbidden for tool in self._tool_manager._tools.values(): - if ( - isinstance(tool, FunctionTool) - and tool.task_config.mode != "forbidden" - ): - docket.register(tool.fn, names=[tool.key]) + tool.register_with_docket(docket) for prompt in self._prompt_manager._prompts.values(): - if ( - isinstance(prompt, FunctionPrompt) - and prompt.task_config.mode != "forbidden" - ): - # task execution requires async fn (validated at creation time) - docket.register( - cast(Callable[..., Awaitable[Any]], prompt.fn), - names=[prompt.key], - ) + prompt.register_with_docket(docket) for resource in self._resource_manager._resources.values(): - if ( - isinstance(resource, FunctionResource) - and resource.task_config.mode != "forbidden" - ): - docket.register(resource.fn, names=[resource.key]) + resource.register_with_docket(docket) for template in self._resource_manager._templates.values(): - if ( - isinstance(template, FunctionResourceTemplate) - and template.task_config.mode != "forbidden" - ): - docket.register(template.fn, names=[template.key]) + template.register_with_docket(docket) # Register provider components for provider in self._providers: try: tasks = await provider.get_tasks() for tool in tasks.tools: - docket.register(tool.fn, names=[tool.key]) + tool.register_with_docket(docket) for resource in tasks.resources: - docket.register(resource.fn, names=[resource.key]) + resource.register_with_docket(docket) for template in tasks.templates: - docket.register(template.fn, names=[template.key]) + template.register_with_docket(docket) for prompt in tasks.prompts: - docket.register( - cast(Callable[..., Awaitable[Any]], prompt.fn), - names=[prompt.key], - ) + prompt.register_with_docket(docket) except Exception as e: provider_name = getattr( provider, "server", provider @@ -650,15 +626,11 @@ async def handler(req: mcp.types.ReadResourceRequest) -> mcp.types.ServerResult: # Route to background if task metadata present and mode allows if task_meta and task_mode != "forbidden": - # For FunctionResource/FunctionResourceTemplate, use Docket - if isinstance( - resource, - FunctionResource | FunctionResourceTemplate, - ): - task_meta_dict = task_meta.model_dump(exclude_none=True) - return await handle_resource_as_task( - self, str(uri), resource, task_meta_dict - ) + # Resource has task support, use Docket for background execution + task_meta_dict = task_meta.model_dump(exclude_none=True) + return await handle_resource_as_task( + self, str(uri), resource, task_meta_dict + ) # Forbidden mode: task requested but mode="forbidden" # Raise error since resources don't have isError field @@ -1480,14 +1452,12 @@ async def _call_tool_mcp( # Route to background if task metadata present and mode allows if task_meta and task_mode != "forbidden": - # For FunctionTool, use Docket for background execution - if isinstance(tool, FunctionTool): - task_meta_dict = task_meta.model_dump(exclude_none=True) - return await handle_tool_as_task( - self, key, arguments, task_meta_dict - ) - # For ProxyTool/mounted tools, proceed with normal execution - # They will forward task metadata to their backend + # Tool has task support, use Docket for background execution + # (ProxyTool has mode="forbidden" and will not enter this branch) + task_meta_dict = task_meta.model_dump(exclude_none=True) + return await handle_tool_as_task( + self, key, arguments, task_meta_dict + ) # Forbidden mode: task requested but mode="forbidden" # Return error result with returned_immediately=True diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 64424356ad..fc3127f090 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -100,11 +100,8 @@ async def handle_tool_as_task( await ctx.session.send_notification(notification) # type: ignore[arg-type] # Queue function to Docket by key (result storage via execution_ttl) - # Use tool.key which matches what was registered - prefixed for mounted tools - await docket.add( - tool.key, - key=task_key, - )(**arguments) + # Use tool.add_to_docket() which handles calling conventions + await tool.add_to_docket(docket, arguments, key=task_key) # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates @@ -206,11 +203,8 @@ async def handle_prompt_as_task( await ctx.session.send_notification(notification) # type: ignore[arg-type] # Queue function to Docket by key (result storage via execution_ttl) - # Use prompt.key which matches what was registered - prefixed for mounted prompts - await docket.add( - prompt.key, - key=task_key, - )(**(arguments or {})) + # Use prompt.add_to_docket() which handles calling conventions + await prompt.add_to_docket(docket, arguments, key=task_key) # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates @@ -310,20 +304,14 @@ async def handle_resource_as_task( await ctx.session.send_notification(notification) # type: ignore[arg-type] # Queue function to Docket by key (result storage via execution_ttl) - # For templates, extract URI params and pass them to the function - from fastmcp.resources.template import FunctionResourceTemplate, match_uri_template + # Use add_to_docket() which handles calling conventions + from fastmcp.resources.template import ResourceTemplate, match_uri_template - if isinstance(resource, FunctionResourceTemplate): + if isinstance(resource, ResourceTemplate): params = match_uri_template(uri, resource.uri_template) or {} - await docket.add( - resource.key, - key=task_key, - )(**params) + await resource.add_to_docket(docket, params, key=task_key) else: - await docket.add( - resource.key, - key=task_key, - )() + await resource.add_to_docket(docket, key=task_key) # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates diff --git a/src/fastmcp/tools/tool.py b/src/fastmcp/tools/tool.py index 191f4737a8..2989ceffc1 100644 --- a/src/fastmcp/tools/tool.py +++ b/src/fastmcp/tools/tool.py @@ -46,6 +46,9 @@ ) if TYPE_CHECKING: + from docket import Docket + from docket.execution import Execution + from fastmcp.tools.tool_transform import ArgTransform, TransformedTool logger = get_logger(__name__) @@ -237,6 +240,18 @@ async def run(self, arguments: dict[str, Any]) -> ToolResult: """ raise NotImplementedError("Subclasses must implement run()") + def register_with_docket(self, docket: Docket) -> None: + """Register this tool with docket for background execution.""" + if self.task_config.mode == "forbidden": + return + docket.register(self.run, names=[self.key]) + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, arguments: dict[str, Any], **kwargs: Any + ) -> Execution: + """Schedule this tool for background execution via docket.""" + return await docket.add(self.key, **kwargs)(arguments) + @classmethod def from_tool( cls, @@ -274,10 +289,6 @@ def from_tool( class FunctionTool(Tool): fn: Callable[..., Any] - task_config: Annotated[ - TaskConfig, - Field(description="Background task execution configuration (SEP-1686)."), - ] = Field(default_factory=lambda: TaskConfig(mode="forbidden")) def to_mcp_tool( self, @@ -416,6 +427,25 @@ async def run(self, arguments: dict[str, Any]) -> ToolResult: structured_content={"result": result} if wrap_result else result, ) + def register_with_docket(self, docket: Docket) -> None: + """Register this tool with docket for background execution. + + FunctionTool registers the underlying function, which has the user's + Depends parameters for docket to resolve. + """ + if self.task_config.mode == "forbidden": + return + docket.register(self.fn, names=[self.key]) + + async def add_to_docket( # type: ignore[override] + self, docket: Docket, arguments: dict[str, Any], **kwargs: Any + ) -> Execution: + """Schedule this tool for background execution via docket. + + FunctionTool splats the arguments dict since .fn expects **kwargs. + """ + return await docket.add(self.key, **kwargs)(**arguments) + def _is_object_schema(schema: dict[str, Any]) -> bool: """Check if a JSON schema represents an object type.""" diff --git a/src/fastmcp/utilities/components.py b/src/fastmcp/utilities/components.py index 833f2fb97b..679b85266d 100644 --- a/src/fastmcp/utilities/components.py +++ b/src/fastmcp/utilities/components.py @@ -1,15 +1,20 @@ from __future__ import annotations from collections.abc import Sequence -from typing import Annotated, Any, TypedDict +from typing import TYPE_CHECKING, Annotated, Any, TypedDict from mcp.types import Icon from pydantic import BeforeValidator, Field, PrivateAttr from typing_extensions import Self, TypeVar import fastmcp +from fastmcp.server.tasks.config import TaskConfig from fastmcp.utilities.types import FastMCPBaseModel +if TYPE_CHECKING: + from docket import Docket + from docket.execution import Execution + T = TypeVar("T", default=Any) @@ -55,6 +60,10 @@ class FastMCPComponent(FastMCPBaseModel): default=True, description="Whether the component is enabled.", ) + task_config: Annotated[ + TaskConfig, + Field(description="Background task execution configuration (SEP-1686)."), + ] = Field(default_factory=lambda: TaskConfig(mode="forbidden")) @property def key(self) -> str: @@ -113,6 +122,36 @@ def copy(self) -> Self: # type: ignore[override] """Create a copy of the component.""" return self.model_copy() + def register_with_docket(self, docket: Docket) -> None: + """Register this component with docket for background execution. + + No-ops if task_config.mode is "forbidden". Subclasses override to + register their callable (self.run, self.read, self.render, or self.fn). + """ + # Base implementation: no-op (subclasses override) + + async def add_to_docket( + self, docket: Docket, *args: Any, **kwargs: Any + ) -> Execution: + """Schedule this component for background execution via docket. + + Subclasses override this to handle their specific calling conventions: + - Tool: add_to_docket(docket, arguments: dict, **kwargs) + - Resource: add_to_docket(docket, **kwargs) + - ResourceTemplate: add_to_docket(docket, params: dict, **kwargs) + - Prompt: add_to_docket(docket, arguments: dict | None, **kwargs) + + The **kwargs are passed through to docket.add() (e.g., key=task_key). + """ + if self.task_config.mode == "forbidden": + raise RuntimeError( + f"Cannot add {self.__class__.__name__} '{self.name}' to docket: " + f"task_config.mode is 'forbidden'" + ) + raise NotImplementedError( + f"{self.__class__.__name__} does not implement add_to_docket()" + ) + class MirroredComponent(FastMCPComponent): """Base class for components that are mirrored from a remote server. diff --git a/tests/server/middleware/test_logging.py b/tests/server/middleware/test_logging.py index 045b998bb9..92a47dde39 100644 --- a/tests/server/middleware/test_logging.py +++ b/tests/server/middleware/test_logging.py @@ -332,7 +332,7 @@ async def test_on_message_with_resource_template_in_payload( assert get_log_lines(caplog) == snapshot( [ - '{"event": "request_start", "method": "test_method", "source": "client", "payload": "{\\"name\\":\\"tmpl\\",\\"title\\":null,\\"description\\":null,\\"icons\\":null,\\"tags\\":[],\\"meta\\":null,\\"enabled\\":true,\\"uri_template\\":\\"tmpl://{id}\\",\\"mime_type\\":\\"text/plain\\",\\"parameters\\":{\\"id\\":{\\"type\\":\\"string\\"}},\\"annotations\\":null}", "payload_type": "ResourceTemplate"}', + '{"event": "request_start", "method": "test_method", "source": "client", "payload": "{\\"name\\":\\"tmpl\\",\\"title\\":null,\\"description\\":null,\\"icons\\":null,\\"tags\\":[],\\"meta\\":null,\\"enabled\\":true,\\"task_config\\":{\\"mode\\":\\"forbidden\\"},\\"uri_template\\":\\"tmpl://{id}\\",\\"mime_type\\":\\"text/plain\\",\\"parameters\\":{\\"id\\":{\\"type\\":\\"string\\"}},\\"annotations\\":null}", "payload_type": "ResourceTemplate"}', '{"event": "request_success", "method": "test_method", "source": "client", "duration_ms": 0.02}', ] )