diff --git a/docs/servers/prompts.mdx b/docs/servers/prompts.mdx index ff5d488e64..1ec521e90b 100644 --- a/docs/servers/prompts.mdx +++ b/docs/servers/prompts.mdx @@ -352,10 +352,10 @@ See [Local Provider](/servers/providers/local#visibility-control) for the comple ### Async Prompts -FastMCP seamlessly supports both standard (`def`) and asynchronous (`async def`) functions as prompts. +FastMCP supports both standard (`def`) and asynchronous (`async def`) functions as prompts. Synchronous functions automatically run in a threadpool to avoid blocking the event loop. ```python -# Synchronous prompt +# Synchronous prompt (runs in threadpool) @mcp.prompt def simple_question(question: str) -> str: """Generates a simple question to ask the LLM.""" @@ -372,7 +372,7 @@ async def data_based_prompt(data_id: str) -> str: return f"Analyze this data: {data['content']}" ``` -Use `async def` when your prompt function performs I/O operations like network requests, database queries, file I/O, or external service calls. +Use `async def` when your prompt function performs I/O operations like network requests or database queries, since async is more efficient than threadpool dispatch. ### Accessing MCP Context diff --git a/docs/servers/resources.mdx b/docs/servers/resources.mdx index 23040dd2dc..a0157a68f4 100644 --- a/docs/servers/resources.mdx +++ b/docs/servers/resources.mdx @@ -270,7 +270,9 @@ For full documentation on the Context object and all its capabilities, see the [ ### Async Resources -Use `async def` for resource functions that perform I/O operations (e.g., reading from a database or network) to avoid blocking the server. +FastMCP supports both `async def` and regular `def` resource functions. Synchronous functions automatically run in a threadpool to avoid blocking the event loop. + +For I/O-bound operations, async functions are more efficient: ```python import aiofiles diff --git a/docs/servers/tools.mdx b/docs/servers/tools.mdx index ec113a739e..c4d8ee5670 100644 --- a/docs/servers/tools.mdx +++ b/docs/servers/tools.mdx @@ -140,56 +140,22 @@ mcp.add_tool(calc.multiply) # Registers with correct schema (only 'x', not 'sel ### Async Support -FastMCP is an async-first framework that seamlessly supports both asynchronous (`async def`) and synchronous (`def`) functions as tools. Async tools are preferred for I/O-bound operations to keep your server responsive. +FastMCP supports both asynchronous (`async def`) and synchronous (`def`) functions as tools. Synchronous tools automatically run in a threadpool to avoid blocking the event loop, so multiple tool calls can execute concurrently even if individual tools perform blocking operations. -While synchronous tools work seamlessly in FastMCP, they can block the event loop during execution. For CPU-intensive or potentially blocking synchronous operations, consider alternative strategies. One approach is to use `anyio` (which FastMCP already uses internally) to wrap them as async functions, for example: - -```python {1, 13} -import anyio +```python from fastmcp import FastMCP +import time mcp = FastMCP() -def cpu_intensive_task(data: str) -> str: - # Some heavy computation that could block the event loop - return processed_data - @mcp.tool -async def wrapped_cpu_task(data: str) -> str: - """CPU-intensive task wrapped to prevent blocking.""" - return await anyio.to_thread.run_sync(cpu_intensive_task, data) +def slow_tool(x: int) -> int: + """This sync function won't block other concurrent requests.""" + time.sleep(2) # Runs in threadpool, not on the event loop + return x * 2 ``` -Alternative approaches include using `asyncio.get_event_loop().run_in_executor()` or other threading techniques to manage blocking operations without impacting server responsiveness. For example, here's a recipe for using the `asyncer` library (not included in FastMCP) to create a decorator that wraps synchronous functions, courtesy of [@hsheth2](https://github.com/jlowin/fastmcp/issues/864#issuecomment-3103678258): - - -```python Decorator Recipe -import asyncer -import functools -from typing import Callable, ParamSpec, TypeVar, Awaitable - -_P = ParamSpec("_P") -_R = TypeVar("_R") - -def make_async_background(fn: Callable[_P, _R]) -> Callable[_P, Awaitable[_R]]: - @functools.wraps(fn) - async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R: - return await asyncer.asyncify(fn)(*args, **kwargs) - - return wrapper -``` - -```python Using the Decorator {6} -from fastmcp import FastMCP - -mcp = FastMCP() - -@mcp.tool() -@make_async_background -def my_tool() -> None: - time.sleep(5) -``` - +For I/O-bound operations like network requests or database queries, async tools are still preferred since they're more efficient than threadpool dispatch. Use sync tools when working with synchronous libraries or for simple operations where the threading overhead doesn't matter. ## Arguments diff --git a/src/fastmcp/prompts/function_prompt.py b/src/fastmcp/prompts/function_prompt.py index 814007ea68..0174fe42b9 100644 --- a/src/fastmcp/prompts/function_prompt.py +++ b/src/fastmcp/prompts/function_prompt.py @@ -30,6 +30,7 @@ ) from fastmcp.server.tasks.config import TaskConfig from fastmcp.tools.tool import AuthCheckCallable +from fastmcp.utilities.async_utils import call_sync_fn_in_threadpool from fastmcp.utilities.json_schema import compress_schema from fastmcp.utilities.logging import get_logger from fastmcp.utilities.types import get_cached_typeadapter @@ -293,9 +294,14 @@ async def render( # self.fn is wrapped by without_injected_parameters which handles # dependency resolution internally - result = self.fn(**kwargs) - if inspect.isawaitable(result): - result = await result + if inspect.iscoroutinefunction(self.fn): + result = await self.fn(**kwargs) + else: + # Run sync functions in threadpool to avoid blocking the event loop + result = await call_sync_fn_in_threadpool(self.fn, **kwargs) + # Handle sync wrappers that return awaitables (e.g., partial(async_fn)) + if inspect.isawaitable(result): + result = await result return self.convert_result(result) except Exception as e: diff --git a/src/fastmcp/resources/function_resource.py b/src/fastmcp/resources/function_resource.py index f65cd10909..dab197ce8c 100644 --- a/src/fastmcp/resources/function_resource.py +++ b/src/fastmcp/resources/function_resource.py @@ -20,7 +20,7 @@ ) from fastmcp.server.tasks.config import TaskConfig from fastmcp.tools.tool import AuthCheckCallable -from fastmcp.utilities.types import get_fn_name +from fastmcp.utilities.async_utils import call_sync_fn_in_threadpool if TYPE_CHECKING: from docket import Docket @@ -147,8 +147,10 @@ def from_function( uri_obj = AnyUrl(metadata.uri) - # Get function name before any transformations - func_name = metadata.name or get_fn_name(fn) + # Get function name - use class name for callable objects + func_name = ( + metadata.name or getattr(fn, "__name__", None) or fn.__class__.__name__ + ) # Normalize task to TaskConfig and validate task_value = metadata.task @@ -160,6 +162,13 @@ def from_function( task_config = task_value task_config.validate_function(fn, func_name) + # if the fn is a callable class, we need to get the __call__ method from here out + if not inspect.isroutine(fn): + fn = fn.__call__ + # if the fn is a staticmethod, we need to work with the underlying function + if isinstance(fn, staticmethod): + fn = fn.__func__ + # Transform Context type annotations to Depends() for unified DI fn = transform_context_annotations(fn) @@ -187,9 +196,14 @@ async def read( """Read the resource by calling the wrapped function.""" # self.fn is wrapped by without_injected_parameters which handles # dependency resolution internally - result = self.fn() - if inspect.isawaitable(result): - result = await result + if inspect.iscoroutinefunction(self.fn): + result = await self.fn() + else: + # Run sync functions in threadpool to avoid blocking the event loop + result = await call_sync_fn_in_threadpool(self.fn) + # Handle sync wrappers that return awaitables (e.g., partial(async_fn)) + if inspect.isawaitable(result): + result = await result # If user returned another Resource, read it recursively if isinstance(result, Resource): diff --git a/src/fastmcp/server/dependencies.py b/src/fastmcp/server/dependencies.py index fac70279a6..0ecaac4aa5 100644 --- a/src/fastmcp/server/dependencies.py +++ b/src/fastmcp/server/dependencies.py @@ -29,6 +29,7 @@ from fastmcp.exceptions import FastMCPError from fastmcp.server.auth import AccessToken from fastmcp.server.http import _current_http_request +from fastmcp.utilities.async_utils import call_sync_fn_in_threadpool from fastmcp.utilities.types import find_kwarg_by_type, is_class_member_of_type if TYPE_CHECKING: @@ -471,12 +472,19 @@ def without_injected_parameters(fn: Callable[..., Any]) -> Callable[..., Any]: new_sig = inspect.Signature(user_params) # Create async wrapper that handles dependency resolution + fn_is_async = inspect.iscoroutinefunction(fn) + async def wrapper(**user_kwargs: Any) -> Any: async with resolve_dependencies(fn, user_kwargs) as resolved_kwargs: - result = fn(**resolved_kwargs) - if inspect.isawaitable(result): - result = await result - return result + if fn_is_async: + return await fn(**resolved_kwargs) + else: + # Run sync functions in threadpool to avoid blocking the event loop + result = await call_sync_fn_in_threadpool(fn, **resolved_kwargs) + # Handle sync wrappers that return awaitables (e.g., partial(async_fn)) + if inspect.isawaitable(result): + result = await result + return result # Set wrapper metadata (only parameter annotations, not return type) wrapper.__signature__ = new_sig # type: ignore[attr-defined] diff --git a/src/fastmcp/tools/function_tool.py b/src/fastmcp/tools/function_tool.py index 69d97be7a0..7bf0d1f723 100644 --- a/src/fastmcp/tools/function_tool.py +++ b/src/fastmcp/tools/function_tool.py @@ -21,9 +21,7 @@ import fastmcp from fastmcp.decorators import resolve_task_config -from fastmcp.server.dependencies import ( - without_injected_parameters, -) +from fastmcp.server.dependencies import without_injected_parameters from fastmcp.server.tasks.config import TaskConfig from fastmcp.tools.function_parsing import ParsedFunction, _is_object_schema from fastmcp.tools.tool import ( @@ -32,6 +30,7 @@ ToolResult, ToolResultSerializerType, ) +from fastmcp.utilities.async_utils import call_sync_fn_in_threadpool from fastmcp.utilities.types import ( NotSet, NotSetT, @@ -237,9 +236,18 @@ async def run(self, arguments: dict[str, Any]) -> ToolResult: """Run the tool with arguments.""" wrapper_fn = without_injected_parameters(self.fn) type_adapter = get_cached_typeadapter(wrapper_fn) - result = type_adapter.validate_python(arguments) - if inspect.isawaitable(result): - result = await result + + if inspect.iscoroutinefunction(wrapper_fn): + # Async function: validate_python returns a coroutine + result = await type_adapter.validate_python(arguments) + else: + # Sync function: run in threadpool to avoid blocking the event loop + result = await call_sync_fn_in_threadpool( + type_adapter.validate_python, arguments + ) + # Handle sync wrappers that return awaitables (e.g., partial(async_fn)) + if inspect.isawaitable(result): + result = await result return self.convert_result(result) diff --git a/src/fastmcp/utilities/async_utils.py b/src/fastmcp/utilities/async_utils.py index b69077c9d2..aa8caafc70 100644 --- a/src/fastmcp/utilities/async_utils.py +++ b/src/fastmcp/utilities/async_utils.py @@ -1,13 +1,26 @@ """Async utilities for FastMCP.""" -from collections.abc import Awaitable -from typing import Literal, TypeVar, overload +import functools +from collections.abc import Awaitable, Callable +from typing import Any, Literal, TypeVar, overload import anyio +from anyio.to_thread import run_sync as run_sync_in_threadpool T = TypeVar("T") +async def call_sync_fn_in_threadpool( + fn: Callable[..., Any], *args: Any, **kwargs: Any +) -> Any: + """Call a sync function in a threadpool to avoid blocking the event loop. + + Uses anyio.to_thread.run_sync which properly propagates contextvars, + making this safe for functions that depend on context (like dependency injection). + """ + return await run_sync_in_threadpool(functools.partial(fn, *args, **kwargs)) + + @overload async def gather( *awaitables: Awaitable[T], diff --git a/tests/prompts/test_prompt.py b/tests/prompts/test_prompt.py index f1027afb39..30c8336e7c 100644 --- a/tests/prompts/test_prompt.py +++ b/tests/prompts/test_prompt.py @@ -550,3 +550,57 @@ def test_promptresult_to_mcp(self): assert len(mcp_result.messages) == 2 assert mcp_result.description == "Test" assert mcp_result.meta == {"key": "value"} + + +class TestPromptCallableAndConcurrency: + """Test prompts with callable objects and concurrent execution.""" + + async def test_callable_object_sync(self): + """Test that callable objects with sync __call__ work.""" + + class MyPrompt: + def __init__(self, greeting: str): + self.greeting = greeting + + def __call__(self) -> str: + return f"{self.greeting}, world!" + + prompt = Prompt.from_function(MyPrompt("Hello")) + result = await prompt.render() + assert result.messages == [Message("Hello, world!")] + + async def test_callable_object_async(self): + """Test that callable objects with async __call__ work.""" + + class AsyncPrompt: + def __init__(self, greeting: str): + self.greeting = greeting + + async def __call__(self) -> str: + return f"async {self.greeting}!" + + prompt = Prompt.from_function(AsyncPrompt("Hello")) + result = await prompt.render() + assert result.messages == [Message("async Hello!")] + + async def test_sync_prompt_runs_concurrently(self): + """Test that sync prompts run in threadpool and don't block each other.""" + import asyncio + import threading + + num_calls = 3 + barrier = threading.Barrier(num_calls, timeout=0.5) + + def concurrent_prompt() -> str: + barrier.wait() + return "done" + + prompt = Prompt.from_function(concurrent_prompt) + + # Run concurrent renders - will raise BrokenBarrierError if not concurrent + results = await asyncio.gather( + prompt.render(), + prompt.render(), + prompt.render(), + ) + assert all(r.messages == [Message("done")] for r in results) diff --git a/tests/resources/test_function_resources.py b/tests/resources/test_function_resources.py index 0c1e8ce3f2..8c44221c98 100644 --- a/tests/resources/test_function_resources.py +++ b/tests/resources/test_function_resources.py @@ -276,3 +276,59 @@ def test_none_meta(self): mcp_content = rc.to_mcp_resource_contents("resource://test") assert mcp_content.meta is None + + +class TestFunctionResourceCallable: + """Test FunctionResource with callable objects.""" + + async def test_callable_object_sync(self): + """Test that callable objects with sync __call__ work.""" + + class MyResource: + def __init__(self, value: str): + self.value = value + + def __call__(self) -> str: + return f"value: {self.value}" + + resource = FunctionResource.from_function(MyResource("test"), uri="fn://test") + result = await resource.read() + assert result == "value: test" + + async def test_callable_object_async(self): + """Test that callable objects with async __call__ work.""" + + class AsyncResource: + def __init__(self, value: str): + self.value = value + + async def __call__(self) -> str: + return f"async value: {self.value}" + + resource = FunctionResource.from_function( + AsyncResource("test"), uri="fn://test" + ) + result = await resource.read() + assert result == "async value: test" + + async def test_sync_resource_runs_concurrently(self): + """Test that sync resources run in threadpool and don't block each other.""" + import asyncio + import threading + + num_calls = 3 + barrier = threading.Barrier(num_calls, timeout=0.5) + + def concurrent_resource() -> str: + barrier.wait() + return "done" + + resource = FunctionResource.from_function(concurrent_resource, uri="fn://test") + + # Run concurrent reads - will raise BrokenBarrierError if not concurrent + results = await asyncio.gather( + resource.read(), + resource.read(), + resource.read(), + ) + assert results == ["done", "done", "done"] diff --git a/tests/tools/test_tool.py b/tests/tools/test_tool.py index 8f9c153060..7b579076dd 100644 --- a/tests/tools/test_tool.py +++ b/tests/tools/test_tool.py @@ -1921,3 +1921,106 @@ def test_tool_execution_forbidden_mode(self): mcp_tool = tool.to_mcp_tool() assert mcp_tool.execution is not None assert mcp_tool.execution.taskSupport == "forbidden" + + +class TestToolCallable: + """Test tools with callable objects.""" + + async def test_callable_object_sync(self): + """Test that callable objects with sync __call__ work.""" + + class MyTool: + def __init__(self, multiplier: int): + self.multiplier = multiplier + + def __call__(self, x: int) -> int: + return x * self.multiplier + + tool = Tool.from_function(MyTool(3)) + result = await tool.run({"x": 5}) + assert result.content == [TextContent(type="text", text="15")] + + async def test_callable_object_async(self): + """Test that callable objects with async __call__ work.""" + + class AsyncTool: + def __init__(self, multiplier: int): + self.multiplier = multiplier + + async def __call__(self, x: int) -> int: + return x * self.multiplier + + tool = Tool.from_function(AsyncTool(4)) + result = await tool.run({"x": 5}) + assert result.content == [TextContent(type="text", text="20")] + + +class TestSyncToolConcurrency: + """Tests for concurrent execution of sync tools without blocking the event loop.""" + + async def test_sync_tools_run_concurrently(self): + """Test that sync tools run in threadpool and don't block each other. + + Uses a threading barrier to prove concurrent execution: all calls must + reach the barrier simultaneously for any to proceed. If they ran + sequentially, only one would reach the barrier and it would timeout. + """ + import asyncio + import threading + + num_calls = 3 + # Barrier requires all threads to arrive before any proceed + # Short timeout since concurrent threads should arrive within milliseconds + barrier = threading.Barrier(num_calls, timeout=0.5) + + def concurrent_tool(x: int) -> int: + """Tool that proves concurrency via barrier synchronization.""" + # If calls run sequentially, only 1 thread reaches barrier and times out + # If calls run concurrently, all 3 reach barrier and proceed + barrier.wait() + return x * 2 + + tool = Tool.from_function(concurrent_tool) + + # Run concurrent calls - will raise BrokenBarrierError if not concurrent + results = await asyncio.gather( + tool.run({"x": 1}), + tool.run({"x": 2}), + tool.run({"x": 3}), + ) + + # Verify results + assert [r.content for r in results] == [ + [TextContent(type="text", text="2")], + [TextContent(type="text", text="4")], + [TextContent(type="text", text="6")], + ] + + async def test_sync_tool_with_context_runs_concurrently(self): + """Test that sync tools with Context dependency also run concurrently.""" + import asyncio + import threading + + from fastmcp import Context, FastMCP + + num_calls = 3 + barrier = threading.Barrier(num_calls, timeout=0.5) + + mcp = FastMCP("test") + + @mcp.tool + def ctx_tool(x: int, ctx: Context) -> str: + """A sync tool with context that uses barrier to prove concurrency.""" + barrier.wait() + return f"{ctx.fastmcp.name}:{x}" + + # Run concurrent calls through the server interface (which sets up Context) + results = await asyncio.gather( + mcp.call_tool("ctx_tool", {"x": 1}), + mcp.call_tool("ctx_tool", {"x": 2}), + mcp.call_tool("ctx_tool", {"x": 3}), + ) + + # Verify results + for i, result in enumerate(results, 1): + assert result.content == [TextContent(type="text", text=f"test:{i}")]