Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/fastmcp/prompts/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
import json
import warnings
from collections.abc import Awaitable, Callable, Sequence
from typing import Annotated, Any
from typing import TYPE_CHECKING, Any

import pydantic_core

if TYPE_CHECKING:
from docket import Docket
from docket.execution import Execution
from mcp import GetPromptResult
from mcp.types import ContentBlock, Icon, PromptMessage, Role, TextContent
from mcp.types import Prompt as SDKPrompt
Expand Down Expand Up @@ -229,15 +233,23 @@ async def _render(
result, description=self.description, meta=self.meta
)

def register_with_docket(self, docket: Docket) -> None:
"""Register this prompt with docket for background execution."""
if self.task_config.mode == "forbidden":
return
docket.register(self.render, names=[self.key])

async def add_to_docket( # type: ignore[override]
self, docket: Docket, arguments: dict[str, Any] | None, **kwargs: Any
) -> Execution:
"""Schedule this prompt for background execution via docket."""
return await docket.add(self.key, **kwargs)(arguments)


class FunctionPrompt(Prompt):
"""A prompt that is a function."""

fn: Callable[..., _PromptFnReturn | Awaitable[_PromptFnReturn]]
task_config: Annotated[
TaskConfig,
Field(description="Background task execution configuration (SEP-1686)."),
] = Field(default_factory=lambda: TaskConfig(mode="forbidden"))

@classmethod
def from_function(
Expand Down Expand Up @@ -458,3 +470,22 @@ async def render(
except Exception as e:
logger.exception(f"Error rendering prompt {self.name}")
raise PromptError(f"Error rendering prompt {self.name}.") from e

def register_with_docket(self, docket: Docket) -> None:
"""Register this prompt with docket for background execution.

FunctionPrompt registers the underlying function, which has the user's
Depends parameters for docket to resolve.
"""
if self.task_config.mode == "forbidden":
return
docket.register(self.fn, names=[self.key]) # type: ignore[arg-type]

async def add_to_docket( # type: ignore[override]
self, docket: Docket, arguments: dict[str, Any] | None, **kwargs: Any
) -> Execution:
"""Schedule this prompt for background execution via docket.

FunctionPrompt splats the arguments dict since .fn expects **kwargs.
"""
return await docket.add(self.key, **kwargs)(**(arguments or {}))
32 changes: 27 additions & 5 deletions src/fastmcp/resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
import inspect
import warnings
from collections.abc import Callable
from typing import Annotated, Any
from typing import TYPE_CHECKING, Annotated, Any

import mcp.types

if TYPE_CHECKING:
from docket import Docket
from docket.execution import Execution
import pydantic
import pydantic_core
from mcp.types import Annotations, Icon
Expand Down Expand Up @@ -277,6 +281,18 @@ def key(self) -> str:
"""The lookup key for this resource. Returns str(uri)."""
return str(self.uri)

def register_with_docket(self, docket: Docket) -> None:
"""Register this resource with docket for background execution."""
if self.task_config.mode == "forbidden":
return
docket.register(self.read, names=[self.key])

async def add_to_docket( # type: ignore[override]
self, docket: Docket, **kwargs: Any
) -> Execution:
"""Schedule this resource for background execution via docket."""
return await docket.add(self.key, **kwargs)()


class FunctionResource(Resource):
"""A resource that defers data loading by wrapping a function.
Expand All @@ -292,10 +308,6 @@ class FunctionResource(Resource):
"""

fn: Callable[..., Any]
task_config: Annotated[
TaskConfig,
Field(description="Background task execution configuration (SEP-1686)."),
] = Field(default_factory=lambda: TaskConfig(mode="forbidden"))

@classmethod
def from_function(
Expand Down Expand Up @@ -366,3 +378,13 @@ async def read(self) -> str | bytes | ResourceContent:

# Convert any value to ResourceContent
return ResourceContent.from_value(result, mime_type=self.mime_type)

def register_with_docket(self, docket: Docket) -> None:
"""Register this resource with docket for background execution.

FunctionResource registers the underlying function, which has the user's
Depends parameters for docket to resolve.
"""
if self.task_config.mode == "forbidden":
return
docket.register(self.fn, names=[self.key])
41 changes: 36 additions & 5 deletions src/fastmcp/resources/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import inspect
import re
from collections.abc import Callable
from typing import Annotated, Any
from typing import TYPE_CHECKING, Any
from urllib.parse import parse_qs, unquote

from mcp.types import Annotations, Icon

if TYPE_CHECKING:
from docket import Docket
from docket.execution import Execution
from mcp.types import ResourceTemplate as SDKResourceTemplate
from pydantic import (
Field,
Expand Down Expand Up @@ -222,15 +226,23 @@ def key(self) -> str:
"""The lookup key for this template. Returns uri_template."""
return self.uri_template

def register_with_docket(self, docket: Docket) -> None:
"""Register this template with docket for background execution."""
if self.task_config.mode == "forbidden":
return
docket.register(self.read, names=[self.key])

async def add_to_docket( # type: ignore[override]
self, docket: Docket, params: dict[str, Any], **kwargs: Any
) -> Execution:
"""Schedule this template for background execution via docket."""
return await docket.add(self.key, **kwargs)(params)


class FunctionResourceTemplate(ResourceTemplate):
"""A template for dynamically creating resources."""

fn: Callable[..., Any]
task_config: Annotated[
TaskConfig,
Field(description="Background task execution configuration (SEP-1686)."),
] = Field(default_factory=lambda: TaskConfig(mode="forbidden"))

async def create_resource(self, uri: str, params: dict[str, Any]) -> Resource:
"""Create a resource from the template with the given parameters."""
Expand Down Expand Up @@ -282,6 +294,25 @@ async def read(self, arguments: dict[str, Any]) -> str | bytes:

return result

def register_with_docket(self, docket: Docket) -> None:
"""Register this template with docket for background execution.

FunctionResourceTemplate registers the underlying function, which has the
user's Depends parameters for docket to resolve.
"""
if self.task_config.mode == "forbidden":
return
docket.register(self.fn, names=[self.key])

async def add_to_docket( # type: ignore[override]
self, docket: Docket, params: dict[str, Any], **kwargs: Any
) -> Execution:
"""Schedule this template for background execution via docket.

FunctionResourceTemplate splats the params dict since .fn expects **kwargs.
"""
return await docket.add(self.key, **kwargs)(**params)

@classmethod
def from_function(
cls,
Expand Down
76 changes: 23 additions & 53 deletions src/fastmcp/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@
from fastmcp.prompts import Prompt
from fastmcp.prompts.prompt import FunctionPrompt, PromptResult
from fastmcp.prompts.prompt_manager import PromptManager
from fastmcp.resources.resource import FunctionResource, Resource, ResourceContent
from fastmcp.resources.resource import Resource, ResourceContent
from fastmcp.resources.resource_manager import ResourceManager
from fastmcp.resources.template import FunctionResourceTemplate, ResourceTemplate
from fastmcp.resources.template import ResourceTemplate
from fastmcp.server.auth import AuthProvider
from fastmcp.server.event_store import EventStore
from fastmcp.server.http import (
Expand Down Expand Up @@ -416,56 +416,32 @@ async def _docket_lifespan(self) -> AsyncIterator[None]:
# Store on server instance for cross-task access (FastMCPTransport)
self._docket = docket

# Register local task-enabled tools/prompts/resources with Docket
# Only function-based variants support background tasks
# Register components where task execution is not "forbidden"
# Register local task-enabled components with Docket
# Each component checks task_config internally and no-ops if forbidden
for tool in self._tool_manager._tools.values():
if (
isinstance(tool, FunctionTool)
and tool.task_config.mode != "forbidden"
):
docket.register(tool.fn, names=[tool.key])
tool.register_with_docket(docket)

for prompt in self._prompt_manager._prompts.values():
if (
isinstance(prompt, FunctionPrompt)
and prompt.task_config.mode != "forbidden"
):
# task execution requires async fn (validated at creation time)
docket.register(
cast(Callable[..., Awaitable[Any]], prompt.fn),
names=[prompt.key],
)
prompt.register_with_docket(docket)

for resource in self._resource_manager._resources.values():
if (
isinstance(resource, FunctionResource)
and resource.task_config.mode != "forbidden"
):
docket.register(resource.fn, names=[resource.key])
resource.register_with_docket(docket)

for template in self._resource_manager._templates.values():
if (
isinstance(template, FunctionResourceTemplate)
and template.task_config.mode != "forbidden"
):
docket.register(template.fn, names=[template.key])
template.register_with_docket(docket)

# Register provider components
for provider in self._providers:
try:
tasks = await provider.get_tasks()
for tool in tasks.tools:
docket.register(tool.fn, names=[tool.key])
tool.register_with_docket(docket)
for resource in tasks.resources:
docket.register(resource.fn, names=[resource.key])
resource.register_with_docket(docket)
for template in tasks.templates:
docket.register(template.fn, names=[template.key])
template.register_with_docket(docket)
for prompt in tasks.prompts:
docket.register(
cast(Callable[..., Awaitable[Any]], prompt.fn),
names=[prompt.key],
)
prompt.register_with_docket(docket)
except Exception as e:
provider_name = getattr(
provider, "server", provider
Expand Down Expand Up @@ -650,15 +626,11 @@ async def handler(req: mcp.types.ReadResourceRequest) -> mcp.types.ServerResult:

# Route to background if task metadata present and mode allows
if task_meta and task_mode != "forbidden":
# For FunctionResource/FunctionResourceTemplate, use Docket
if isinstance(
resource,
FunctionResource | FunctionResourceTemplate,
):
task_meta_dict = task_meta.model_dump(exclude_none=True)
return await handle_resource_as_task(
self, str(uri), resource, task_meta_dict
)
# Resource has task support, use Docket for background execution
task_meta_dict = task_meta.model_dump(exclude_none=True)
return await handle_resource_as_task(
self, str(uri), resource, task_meta_dict
)

# Forbidden mode: task requested but mode="forbidden"
# Raise error since resources don't have isError field
Expand Down Expand Up @@ -1480,14 +1452,12 @@ async def _call_tool_mcp(

# Route to background if task metadata present and mode allows
if task_meta and task_mode != "forbidden":
# For FunctionTool, use Docket for background execution
if isinstance(tool, FunctionTool):
task_meta_dict = task_meta.model_dump(exclude_none=True)
return await handle_tool_as_task(
self, key, arguments, task_meta_dict
)
# For ProxyTool/mounted tools, proceed with normal execution
# They will forward task metadata to their backend
# Tool has task support, use Docket for background execution
# (ProxyTool has mode="forbidden" and will not enter this branch)
task_meta_dict = task_meta.model_dump(exclude_none=True)
return await handle_tool_as_task(
self, key, arguments, task_meta_dict
)

# Forbidden mode: task requested but mode="forbidden"
# Return error result with returned_immediately=True
Expand Down
30 changes: 9 additions & 21 deletions src/fastmcp/server/tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,8 @@ async def handle_tool_as_task(
await ctx.session.send_notification(notification) # type: ignore[arg-type]

# Queue function to Docket by key (result storage via execution_ttl)
# Use tool.key which matches what was registered - prefixed for mounted tools
await docket.add(
tool.key,
key=task_key,
)(**arguments)
# Use tool.add_to_docket() which handles calling conventions
await tool.add_to_docket(docket, arguments, key=task_key)

# Spawn subscription task to send status notifications (SEP-1686 optional feature)
from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates
Expand Down Expand Up @@ -206,11 +203,8 @@ async def handle_prompt_as_task(
await ctx.session.send_notification(notification) # type: ignore[arg-type]

# Queue function to Docket by key (result storage via execution_ttl)
# Use prompt.key which matches what was registered - prefixed for mounted prompts
await docket.add(
prompt.key,
key=task_key,
)(**(arguments or {}))
# Use prompt.add_to_docket() which handles calling conventions
await prompt.add_to_docket(docket, arguments, key=task_key)

# Spawn subscription task to send status notifications (SEP-1686 optional feature)
from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates
Expand Down Expand Up @@ -310,20 +304,14 @@ async def handle_resource_as_task(
await ctx.session.send_notification(notification) # type: ignore[arg-type]

# Queue function to Docket by key (result storage via execution_ttl)
# For templates, extract URI params and pass them to the function
from fastmcp.resources.template import FunctionResourceTemplate, match_uri_template
# Use add_to_docket() which handles calling conventions
from fastmcp.resources.template import ResourceTemplate, match_uri_template

if isinstance(resource, FunctionResourceTemplate):
if isinstance(resource, ResourceTemplate):
params = match_uri_template(uri, resource.uri_template) or {}
await docket.add(
resource.key,
key=task_key,
)(**params)
await resource.add_to_docket(docket, params, key=task_key)
else:
await docket.add(
resource.key,
key=task_key,
)()
await resource.add_to_docket(docket, key=task_key)

# Spawn subscription task to send status notifications (SEP-1686 optional feature)
from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates
Expand Down
Loading