Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 90 additions & 101 deletions src/fastmcp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
PaginatedRequestParams,
TaskStatusNotification,
)
from pydantic import AnyUrl
from pydantic import AnyUrl, RootModel

import fastmcp
from fastmcp.client.elicitation import ElicitationHandler, create_elicitation_callback
Expand Down Expand Up @@ -907,41 +907,44 @@ async def _read_resource_as_task(
ResourceTask: Future-like object for accessing task status and results
"""
# Per SEP-1686 final spec: client sends only ttl, server generates taskId
# Read resource with task metadata (no taskId sent)
result = await self.read_resource_mcp(
uri=uri,
meta={
"modelcontextprotocol.io/task": {
"ttl": ttl,
}
},
if isinstance(uri, str):
uri = AnyUrl(uri)

request = mcp.types.ReadResourceRequest(
params=mcp.types.ReadResourceRequestParams(
uri=uri,
task=mcp.types.TaskMetadata(ttl=ttl),
)
)

# Check if server accepted background execution
# If response includes task metadata with taskId, server accepted background mode
# If response includes returned_immediately=True, server declined and executed sync
task_meta = (result.meta or {}).get("modelcontextprotocol.io/task", {})
if task_meta.get("taskId"):
# Background execution accepted - extract server-generated taskId
server_task_id = task_meta["taskId"]
# Track this task ID for list_tasks()
# Server returns CreateTaskResult (task accepted) or ReadResourceResult (graceful degradation)
TaskResponseUnion = RootModel[
mcp.types.CreateTaskResult | mcp.types.ReadResourceResult
]
wrapped_result = await self.session.send_request(
request=request, # type: ignore[arg-type]
result_type=TaskResponseUnion, # type: ignore[arg-type]
)
raw_result = wrapped_result.root

if isinstance(raw_result, mcp.types.CreateTaskResult):
# Task was accepted - extract task info from CreateTaskResult
server_task_id = raw_result.task.taskId
self._submitted_task_ids.add(server_task_id)

# Create task object
task_obj = ResourceTask(
self, server_task_id, uri=str(uri), immediate_result=None
)

# Register for notification routing
self._task_registry[server_task_id] = weakref.ref(task_obj) # type: ignore[assignment]

return task_obj
else:
# Server declined background execution (graceful degradation)
# Use a synthetic task ID for the immediate result
# Graceful degradation - server returned ReadResourceResult
synthetic_task_id = task_id or str(uuid.uuid4())
return ResourceTask(
self, synthetic_task_id, uri=str(uri), immediate_result=result.contents
self,
synthetic_task_id,
uri=str(uri),
immediate_result=raw_result.contents,
)

# async def subscribe_resource(self, uri: AnyUrl | str) -> None:
Expand Down Expand Up @@ -1119,42 +1122,51 @@ async def _get_prompt_as_task(
PromptTask: Future-like object for accessing task status and results
"""
# Per SEP-1686 final spec: client sends only ttl, server generates taskId
# Call prompt with task metadata (no taskId sent)
result = await self.get_prompt_mcp(
name=name,
arguments=arguments or {},
meta={
"modelcontextprotocol.io/task": {
"ttl": ttl,
}
},
# Serialize arguments for MCP protocol
serialized_arguments: dict[str, str] | None = None
if arguments:
serialized_arguments = {}
for key, value in arguments.items():
if isinstance(value, str):
serialized_arguments[key] = value
else:
serialized_arguments[key] = pydantic_core.to_json(value).decode(
"utf-8"
)

request = mcp.types.GetPromptRequest(
params=mcp.types.GetPromptRequestParams(
name=name,
arguments=serialized_arguments,
task=mcp.types.TaskMetadata(ttl=ttl),
)
)

# Server returns CreateTaskResult (task accepted) or GetPromptResult (graceful degradation)
TaskResponseUnion = RootModel[
mcp.types.CreateTaskResult | mcp.types.GetPromptResult
]
wrapped_result = await self.session.send_request(
request=request, # type: ignore[arg-type]
result_type=TaskResponseUnion, # type: ignore[arg-type]
)
raw_result = wrapped_result.root

# Check if server accepted background execution
# If response includes task metadata with taskId, server accepted background mode
# If response includes returned_immediately=True, server declined and executed sync
task_meta = (result.meta or {}).get("modelcontextprotocol.io/task", {})
if task_meta.get("taskId"):
# Background execution accepted - extract server-generated taskId
server_task_id = task_meta["taskId"]
# Track this task ID for list_tasks()
if isinstance(raw_result, mcp.types.CreateTaskResult):
# Task was accepted - extract task info from CreateTaskResult
server_task_id = raw_result.task.taskId
self._submitted_task_ids.add(server_task_id)

# Create task object
task_obj = PromptTask(
self, server_task_id, prompt_name=name, immediate_result=None
)

# Register for notification routing
self._task_registry[server_task_id] = weakref.ref(task_obj) # type: ignore[assignment]

return task_obj
else:
# Server declined background execution (graceful degradation)
# Use a synthetic task ID for the immediate result
# Graceful degradation - server returned GetPromptResult
synthetic_task_id = task_id or str(uuid.uuid4())
return PromptTask(
self, synthetic_task_id, prompt_name=name, immediate_result=result
self, synthetic_task_id, prompt_name=name, immediate_result=raw_result
)

# --- Completion ---
Expand Down Expand Up @@ -1284,33 +1296,13 @@ async def call_tool_mcp(
if isinstance(timeout, int | float):
timeout = datetime.timedelta(seconds=float(timeout))

# For task submissions, use send_request to bypass SDK validation
# Task acknowledgments don't have structured content, which would fail validation
if meta and "modelcontextprotocol.io/task" in meta:
task_dict = meta.get("modelcontextprotocol.io/task")
request = mcp.types.CallToolRequest(
params=mcp.types.CallToolRequestParams(
name=name,
arguments=arguments,
task=mcp.types.TaskMetadata(**task_dict)
if task_dict
else None, # SEP-1686: task as direct param (spec-compliant)
)
)
result = await self.session.send_request(
request=request, # type: ignore[arg-type]
result_type=mcp.types.CallToolResult,
request_read_timeout_seconds=timeout, # type: ignore[arg-type]
progress_callback=progress_handler or self._progress_handler,
)
else:
result = await self.session.call_tool(
name=name,
arguments=arguments,
read_timeout_seconds=timeout, # ty: ignore[invalid-argument-type]
progress_callback=progress_handler or self._progress_handler,
meta=meta,
)
result = await self.session.call_tool(
name=name,
arguments=arguments,
read_timeout_seconds=timeout, # ty: ignore[invalid-argument-type]
progress_callback=progress_handler or self._progress_handler,
meta=meta,
)
return result

async def _parse_call_tool_result(
Expand Down Expand Up @@ -1470,42 +1462,39 @@ async def _call_tool_as_task(
ToolTask: Future-like object for accessing task status and results
"""
# Per SEP-1686 final spec: client sends only ttl, server generates taskId
# Call tool with task metadata (no taskId sent)
result = await self.call_tool_mcp(
name=name,
arguments=arguments or {},
meta={
"modelcontextprotocol.io/task": {
"ttl": ttl,
}
},
# Build request with task metadata
request = mcp.types.CallToolRequest(
params=mcp.types.CallToolRequestParams(
name=name,
arguments=arguments or {},
task=mcp.types.TaskMetadata(ttl=ttl),
)
)

# Server returns CreateTaskResult (task accepted) or CallToolResult (graceful degradation)
# Use RootModel with Union to handle both response types (SDK calls model_validate)
TaskResponseUnion = RootModel[
mcp.types.CreateTaskResult | mcp.types.CallToolResult
]
wrapped_result = await self.session.send_request(
request=request, # type: ignore[arg-type]
result_type=TaskResponseUnion, # type: ignore[arg-type]
)
raw_result = wrapped_result.root

# Check if server accepted background execution
# If response includes task metadata with taskId, server accepted background mode
# If response includes returned_immediately=True, server declined and executed sync
task_meta = (result.meta or {}).get("modelcontextprotocol.io/task", {})
if task_meta.get("taskId"):
# Background execution accepted - extract server-generated taskId
server_task_id = task_meta["taskId"]
# Track this task ID for list_tasks()
if isinstance(raw_result, mcp.types.CreateTaskResult):
# Task was accepted - extract task info from CreateTaskResult
server_task_id = raw_result.task.taskId
self._submitted_task_ids.add(server_task_id)

# Create task object
task_obj = ToolTask(
self, server_task_id, tool_name=name, immediate_result=None
)

# Register for notification routing
self._task_registry[server_task_id] = weakref.ref(task_obj) # type: ignore[assignment]

return task_obj
else:
# Server declined background execution (graceful degradation)
# or returned_immediately=True - executed synchronously
# Wrap the immediate result
parsed_result = await self._parse_call_tool_result(name, result)
# Use a synthetic task ID for the immediate result
# Graceful degradation - server returned CallToolResult
parsed_result = await self._parse_call_tool_result(name, raw_result)
synthetic_task_id = task_id or str(uuid.uuid4())
return ToolTask(
self, synthetic_task_id, tool_name=name, immediate_result=parsed_result
Expand Down
83 changes: 48 additions & 35 deletions src/fastmcp/prompts/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ async def render(
"""
raise NotImplementedError("Subclasses must implement render()")

def convert_result(self, raw_value: Any) -> PromptResult:
"""Convert a raw return value to PromptResult.

Subclasses should override this to handle their specific conversion logic.
"""
raise NotImplementedError("Subclasses must implement convert_result()")

async def _render(
self,
arguments: dict[str, Any] | None = None,
Expand Down Expand Up @@ -432,45 +439,51 @@ async def render(
if inspect.isawaitable(result):
result = await result

# Validate messages
if not isinstance(result, list | tuple):
result = [result]

# Convert result to messages
messages: list[PromptMessage] = []
for msg in result:
try:
if isinstance(msg, PromptMessage):
messages.append(msg)
elif isinstance(msg, str):
messages.append(
PromptMessage(
role="user",
content=TextContent(type="text", text=msg),
)
)
else:
content = pydantic_core.to_json(msg, fallback=str).decode()
messages.append(
PromptMessage(
role="user",
content=TextContent(type="text", text=content),
)
)
except Exception as e:
raise PromptError(
"Could not convert prompt result to message."
) from e

return PromptResult(
messages=messages,
description=self.description,
meta=self.meta,
)
return self.convert_result(result)
except Exception as e:
logger.exception(f"Error rendering prompt {self.name}")
raise PromptError(f"Error rendering prompt {self.name}.") from e

def convert_result(self, raw_value: Any) -> PromptResult:
"""Convert a raw return value to PromptResult.

This handles the same conversion logic as render(), but works on
already-executed raw values (e.g., from Docket background execution).
"""
# Normalize to list
if not isinstance(raw_value, list | tuple):
raw_value = [raw_value]

# Convert result to messages
messages: list[PromptMessage] = []
for msg in raw_value:
try:
if isinstance(msg, PromptMessage):
messages.append(msg)
elif isinstance(msg, str):
messages.append(
PromptMessage(
role="user",
content=TextContent(type="text", text=msg),
)
)
else:
content = pydantic_core.to_json(msg, fallback=str).decode()
messages.append(
PromptMessage(
role="user",
content=TextContent(type="text", text=content),
)
)
except Exception as e:
raise PromptError("Could not convert prompt result to message.") from e

return PromptResult(
messages=messages,
description=self.description,
meta=self.meta,
)

def register_with_docket(self, docket: Docket) -> None:
"""Register this prompt with docket for background execution.

Expand Down
11 changes: 9 additions & 2 deletions src/fastmcp/resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,15 @@ async def read(self) -> str | bytes | ResourceContent:
if isinstance(result, Resource):
return await result.read()

# Convert any value to ResourceContent
return ResourceContent.from_value(result, mime_type=self.mime_type)
return self.convert_result(result)

def convert_result(self, raw_value: Any) -> ResourceContent:
"""Convert a raw return value to ResourceContent.

This handles the same conversion logic as read(), but works on
already-executed raw values (e.g., from Docket background execution).
"""
return ResourceContent.from_value(raw_value, mime_type=self.mime_type)

def register_with_docket(self, docket: Docket) -> None:
"""Register this resource with docket for background execution.
Expand Down
Loading