Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/fastmcp/resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import base64
import inspect
from collections.abc import Callable
from typing import TYPE_CHECKING, Annotated, Any, ClassVar
from dataclasses import replace
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, overload

import mcp.types

Expand All @@ -27,7 +28,7 @@
from typing_extensions import Self

from fastmcp.server.dependencies import without_injected_parameters
from fastmcp.server.tasks.config import TaskConfig
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.types import get_fn_name

Expand Down Expand Up @@ -300,23 +301,42 @@ def convert_result(self, raw_value: Any) -> ResourceResult:
# ResourceResult.__init__ handles all normalization
return ResourceResult(raw_value)

async def _read(self) -> ResourceResult | mcp.types.CreateTaskResult:
@overload
async def _read(self, task_meta: None = None) -> ResourceResult: ...

@overload
async def _read(self, task_meta: TaskMeta) -> mcp.types.CreateTaskResult: ...

async def _read(
self, task_meta: TaskMeta | None = None
) -> ResourceResult | mcp.types.CreateTaskResult:
"""Server entry point that handles task routing.

This allows ANY Resource subclass to support background execution by setting
task_config.mode to "supported" or "required". The server calls this
method instead of read() directly.

Args:
task_meta: If provided, execute as a background task and return
CreateTaskResult. If None (default), execute synchronously and
return ResourceResult.

Returns:
ResourceResult when task_meta is None.
CreateTaskResult when task_meta is provided.

Subclasses can override this to customize task routing behavior.
For example, FastMCPProviderResource overrides to delegate to child
middleware without submitting to Docket.
"""
from fastmcp.server.dependencies import _docket_fn_key
from fastmcp.server.tasks.routing import check_background_task

key = _docket_fn_key.get() or self.key
# Enrich task_meta with fn_key if not already set (fallback for programmatic API)
if task_meta is not None and task_meta.fn_key is None:
task_meta = replace(task_meta, fn_key=self.key)

task_result = await check_background_task(
component=self, task_type="resource", key=key
component=self, task_type="resource", arguments=None, task_meta=task_meta
)
if task_result:
return task_result
Expand Down
73 changes: 57 additions & 16 deletions src/fastmcp/resources/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import inspect
import re
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, ClassVar
from dataclasses import replace
from typing import TYPE_CHECKING, Any, ClassVar, overload
from urllib.parse import parse_qs, unquote

import mcp.types
Expand All @@ -23,7 +24,7 @@

from fastmcp.resources.resource import Resource, ResourceResult
from fastmcp.server.dependencies import without_injected_parameters
from fastmcp.server.tasks.config import TaskConfig
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.json_schema import compress_schema
from fastmcp.utilities.types import get_cached_typeadapter
Expand Down Expand Up @@ -177,28 +178,48 @@ def convert_result(self, raw_value: Any) -> ResourceResult:
# ResourceResult.__init__ handles all normalization
return ResourceResult(raw_value)

@overload
async def _read(
self, uri: str, params: dict[str, Any]
self, uri: str, params: dict[str, Any], task_meta: None = None
) -> ResourceResult: ...

@overload
async def _read(
self, uri: str, params: dict[str, Any], task_meta: TaskMeta
) -> mcp.types.CreateTaskResult: ...

async def _read(
self, uri: str, params: dict[str, Any], task_meta: TaskMeta | None = None
) -> ResourceResult | mcp.types.CreateTaskResult:
"""Server entry point that handles task routing.

This allows ANY ResourceTemplate subclass to support background execution
by setting task_config.mode to "supported" or "required". The server calls
this method instead of create_resource()/read() directly.

Args:
uri: The concrete URI being read
params: Template parameters extracted from the URI
task_meta: If provided, execute as a background task and return
CreateTaskResult. If None (default), execute synchronously and
return ResourceResult.

Returns:
ResourceResult when task_meta is None.
CreateTaskResult when task_meta is provided.

Subclasses can override this to customize task routing behavior.
For example, FastMCPProviderResourceTemplate overrides to delegate to child
middleware without submitting to Docket.
"""
from fastmcp.server.dependencies import _docket_fn_key
from fastmcp.server.tasks.routing import check_background_task

# Templates need pattern check: only use contextvar if it contains '{'
key = _docket_fn_key.get()
if not key or "{" not in key:
key = self.key
# Enrich task_meta with fn_key if not already set (fallback for programmatic API)
if task_meta is not None and task_meta.fn_key is None:
task_meta = replace(task_meta, fn_key=self.key)

task_result = await check_background_task(
component=self, task_type="template", key=key, arguments=params
component=self, task_type="template", arguments=params, task_meta=task_meta
)
if task_result:
return task_result
Expand Down Expand Up @@ -294,23 +315,43 @@ class FunctionResourceTemplate(ResourceTemplate):

fn: Callable[..., Any]

@overload
async def _read(
self, uri: str, params: dict[str, Any]
self, uri: str, params: dict[str, Any], task_meta: None = None
) -> ResourceResult: ...

@overload
async def _read(
self, uri: str, params: dict[str, Any], task_meta: TaskMeta
) -> mcp.types.CreateTaskResult: ...

async def _read(
self, uri: str, params: dict[str, Any], task_meta: TaskMeta | None = None
) -> ResourceResult | mcp.types.CreateTaskResult:
"""Optimized server entry point that skips ephemeral resource creation.

For FunctionResourceTemplate, we can call read() directly instead of
creating a temporary resource, which is more efficient.

Args:
uri: The concrete URI being read
params: Template parameters extracted from the URI
task_meta: If provided, execute as a background task and return
CreateTaskResult. If None (default), execute synchronously and
return ResourceResult.

Returns:
ResourceResult when task_meta is None.
CreateTaskResult when task_meta is provided.
"""
from fastmcp.server.dependencies import _docket_fn_key
from fastmcp.server.tasks.routing import check_background_task

# Templates need pattern check: only use contextvar if it contains '{'
key = _docket_fn_key.get()
if not key or "{" not in key:
key = self.key
# Enrich task_meta with fn_key if not already set (fallback for programmatic API)
if task_meta is not None and task_meta.fn_key is None:
task_meta = replace(task_meta, fn_key=self.key)

task_result = await check_background_task(
component=self, task_type="template", key=key, arguments=params
component=self, task_type="template", arguments=params, task_meta=task_meta
)
if task_result:
return task_result
Expand Down
104 changes: 66 additions & 38 deletions src/fastmcp/server/providers/fastmcp_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import re
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any
from dataclasses import replace
from typing import TYPE_CHECKING, Any, overload

import mcp.types
from mcp.types import AnyUrl
Expand Down Expand Up @@ -85,6 +86,20 @@ def wrap(cls, server: Any, tool: Tool) -> FastMCPProviderTool:
task_config=tool.task_config,
)

@overload
async def _run(
self,
arguments: dict[str, Any],
task_meta: None = None,
) -> ToolResult: ...

@overload
async def _run(
self,
arguments: dict[str, Any],
task_meta: TaskMeta,
) -> mcp.types.CreateTaskResult: ...

async def _run(
self,
arguments: dict[str, Any],
Expand All @@ -94,7 +109,15 @@ async def _run(

Passes task_meta through to the child server so it can handle
backgrounding appropriately.

Enriches fn_key with self.key (the parent's namespaced tool name) so that
when the child tool delegates to Docket, it uses the correct lookup key
that was registered via get_tasks().
"""
# Enrich fn_key with parent's key before delegating to child
if task_meta is not None and task_meta.fn_key is None:
task_meta = replace(task_meta, fn_key=self.key)

return await self._server.call_tool(
self._original_name, arguments, task_meta=task_meta
)
Expand Down Expand Up @@ -148,19 +171,29 @@ def wrap(cls, server: Any, resource: Resource) -> FastMCPProviderResource:
task_config=resource.task_config,
)

async def _read(self) -> ResourceResult | mcp.types.CreateTaskResult:
"""Skip task routing - delegate to child server's read_resource().
@overload
async def _read(self, task_meta: None = None) -> ResourceResult: ...

The actual underlying resource will check _task_metadata contextvar and
submit to Docket if appropriate. This wrapper just passes through.
@overload
async def _read(self, task_meta: TaskMeta) -> mcp.types.CreateTaskResult: ...

Note: The _docket_fn_key contextvar is intentionally NOT updated here.
The parent set it to the full namespaced key (e.g., data://c/gc/value)
which is what the function is registered under in Docket. All provider
layers pass this through unchanged so the eventual resource._read()
uses the correct Docket lookup key.
async def _read(
self, task_meta: TaskMeta | None = None
) -> ResourceResult | mcp.types.CreateTaskResult:
"""Delegate to child server's read_resource() with task_meta.

Passes task_meta through to the child server so it can handle
backgrounding appropriately.

Enriches fn_key with self.key (the parent's namespaced URI) so that
when the child resource delegates to Docket, it uses the correct
lookup key that was registered via get_tasks().
"""
return await self._server.read_resource(self._original_uri)
# Enrich fn_key with parent's URI before delegating to child
if task_meta is not None and task_meta.fn_key is None:
task_meta = TaskMeta(ttl=task_meta.ttl, fn_key=self.key)

return await self._server.read_resource(self._original_uri, task_meta=task_meta)


class FastMCPProviderPrompt(Prompt):
Expand Down Expand Up @@ -277,41 +310,36 @@ async def create_resource(self, uri: str, params: dict[str, Any]) -> Resource:
mime_type=self.mime_type,
)

@overload
async def _read(
self, uri: str, params: dict[str, Any]
) -> ResourceResult | mcp.types.CreateTaskResult:
"""Delegate to child server's read_resource().
self, uri: str, params: dict[str, Any], task_meta: None = None
) -> ResourceResult: ...

@overload
async def _read(
self, uri: str, params: dict[str, Any], task_meta: TaskMeta
) -> mcp.types.CreateTaskResult: ...

Skips task routing at this layer - the child's template._read() will
check _task_metadata contextvar and submit to Docket if appropriate.
async def _read(
self, uri: str, params: dict[str, Any], task_meta: TaskMeta | None = None
) -> ResourceResult | mcp.types.CreateTaskResult:
"""Delegate to child server's read_resource() with task_meta.

Sets _docket_fn_key to self.uri_template (the transformed pattern) so that
when the child template's _read() submits to Docket, it uses the correct
key that matches what was registered via TransformingProvider.get_tasks().
Passes task_meta through to the child server so it can handle
backgrounding appropriately.

Only sets _docket_fn_key if not already set - in nested mounts, the
outermost wrapper sets the key and inner wrappers preserve it.
Enriches fn_key with self.key (the parent's namespaced template pattern)
so that when the child template delegates to Docket, it uses the correct
lookup key that was registered via get_tasks().
"""
from fastmcp.server.dependencies import _docket_fn_key

# Expand the original template with params to get internal URI
original_uri = _expand_uri_template(self._original_uri_template or "", params)

# Set _docket_fn_key to the template pattern, but only if the current
# value isn't already a template pattern (contains '{').
# - Server sets concrete URI (e.g., "item://c/gc/42") - no '{', override it
# - Outer wrapper sets pattern (e.g., "item://c/gc/{id}") - has '{', keep it
# In nested mounts (parent→child→grandchild), the outermost wrapper
# has the fully-transformed pattern that matches Docket registration.
existing_key = _docket_fn_key.get()
key_token = None
if not existing_key or "{" not in existing_key:
key_token = _docket_fn_key.set(self.key)
try:
return await self._server.read_resource(original_uri)
finally:
if key_token is not None:
_docket_fn_key.reset(key_token)
# Enrich fn_key with parent's namespaced template pattern before delegating
if task_meta is not None and task_meta.fn_key is None:
task_meta = replace(task_meta, fn_key=self.key)

return await self._server.read_resource(original_uri, task_meta=task_meta)

async def read(self, arguments: dict[str, Any]) -> str | bytes | ResourceResult:
"""Read the resource content for background task execution.
Expand Down
Loading