diff --git a/docs/development/v3-notes/v3-features.mdx b/docs/development/v3-notes/v3-features.mdx
index 95896a8e4e..7e49122a39 100644
--- a/docs/development/v3-notes/v3-features.mdx
+++ b/docs/development/v3-notes/v3-features.mdx
@@ -267,6 +267,59 @@ fastmcp dev server.py # Includes --reload by default
---
+## Component Authorization
+
+v3.0 introduces callable-based authorization for tools, resources, and prompts (`src/fastmcp/server/auth/authorization.py`).
+
+**Component-level auth**:
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth, require_scopes
+
+mcp = FastMCP()
+
+@mcp.tool(auth=require_auth)
+def protected_tool(): ...
+
+@mcp.resource("data://secret", auth=require_scopes("read"))
+def secret_data(): ...
+
+@mcp.prompt(auth=require_scopes("admin"))
+def admin_prompt(): ...
+```
+
+**Server-wide auth via middleware**:
+
+```python
+from fastmcp.server.middleware import AuthMiddleware
+from fastmcp.server.auth import require_auth, restrict_tag
+
+# Require auth for all components
+mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+# Tag-based restrictions
+mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+])
+```
+
+Built-in checks:
+- `require_auth`: Requires any valid token
+- `require_scopes(*scopes)`: Requires specific OAuth scopes
+- `restrict_tag(tag, scopes)`: Requires scopes only for tagged components
+
+Custom checks receive `AuthContext` with `token` and `component`:
+
+```python
+def custom_check(ctx: AuthContext) -> bool:
+ return ctx.token is not None and "admin" in ctx.token.scopes
+```
+
+STDIO transport bypasses all auth checks (no OAuth concept).
+
+---
+
## Deprecated Features
These emit deprecation warnings but continue to work.
diff --git a/docs/docs.json b/docs/docs.json
index 8105d89245..2649240469 100644
--- a/docs/docs.json
+++ b/docs/docs.json
@@ -120,7 +120,7 @@
"group": "Features",
"icon": "stars",
"pages": [
- "servers/tasks",
+ "servers/authorization",
"servers/context",
"servers/elicitation",
"servers/icons",
@@ -129,7 +129,8 @@
"servers/middleware",
"servers/progress",
"servers/sampling",
- "servers/storage-backends"
+ "servers/storage-backends",
+ "servers/tasks"
]
},
{
diff --git a/docs/servers/authorization.mdx b/docs/servers/authorization.mdx
new file mode 100644
index 0000000000..ac65f21cb8
--- /dev/null
+++ b/docs/servers/authorization.mdx
@@ -0,0 +1,348 @@
+---
+title: Authorization
+sidebarTitle: Authorization
+description: Control access to components using callable-based authorization checks that filter visibility and enforce permissions.
+icon: shield-halved
+tag: NEW
+---
+
+import { VersionBadge } from "/snippets/version-badge.mdx"
+
+
+
+Authorization controls what authenticated users can do with your FastMCP server. While [authentication](/servers/auth/authentication) verifies identity (who you are), authorization determines access (what you can do). FastMCP provides a callable-based authorization system that works at both the component level and globally via middleware.
+
+The authorization model centers on a simple concept: callable functions that receive context about the current request and return `True` to allow access or `False` to deny it. Multiple checks combine with AND logic, meaning all checks must pass for access to be granted.
+
+
+Authorization relies on OAuth tokens which are only available with HTTP transports (SSE, Streamable HTTP). In STDIO mode, there's no OAuth mechanism, so `get_access_token()` returns `None` and all auth checks are skipped.
+
+
+## Auth Checks
+
+An auth check is any callable that accepts an `AuthContext` and returns a boolean. The `AuthContext` provides access to the current token (if any) and the component being accessed.
+
+```python
+from fastmcp.server.auth import AuthContext
+
+def my_custom_check(ctx: AuthContext) -> bool:
+ # ctx.token is AccessToken | None
+ # ctx.component is the Tool, Resource, or Prompt being accessed
+ return ctx.token is not None and "special" in ctx.token.scopes
+```
+
+FastMCP provides three built-in auth checks that cover common authorization patterns.
+
+### require_auth
+
+The simplest check verifies that any valid authentication token is present. Unauthenticated requests are denied.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth
+
+mcp = FastMCP("Protected Server")
+
+@mcp.tool(auth=require_auth)
+def protected_operation() -> str:
+ """Only accessible to authenticated users."""
+ return "Success"
+```
+
+### require_scopes
+
+For scope-based authorization, `require_scopes` checks that the token contains all specified OAuth scopes. When multiple scopes are provided, all must be present (AND logic).
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_scopes
+
+mcp = FastMCP("Scoped Server")
+
+@mcp.tool(auth=require_scopes("admin"))
+def admin_operation() -> str:
+ """Requires the 'admin' scope."""
+ return "Admin action completed"
+
+@mcp.tool(auth=require_scopes("read", "write"))
+def read_write_operation() -> str:
+ """Requires both 'read' AND 'write' scopes."""
+ return "Read/write action completed"
+```
+
+### restrict_tag
+
+Tag-based restrictions apply scope requirements conditionally. If a component has the specified tag, the token must have the required scopes. Components without the tag are unaffected.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import restrict_tag
+from fastmcp.server.middleware import AuthMiddleware
+
+mcp = FastMCP(
+ "Tagged Server",
+ middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+ ]
+)
+
+@mcp.tool(tags={"admin"})
+def admin_tool() -> str:
+ """Tagged 'admin', so requires 'admin' scope."""
+ return "Admin only"
+
+@mcp.tool(tags={"public"})
+def public_tool() -> str:
+ """Not tagged 'admin', so no scope required by the restriction."""
+ return "Anyone can access"
+```
+
+### Combining Checks
+
+Multiple auth checks can be combined by passing a list. All checks must pass for authorization to succeed (AND logic).
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth, require_scopes
+
+mcp = FastMCP("Combined Auth Server")
+
+@mcp.tool(auth=[require_auth, require_scopes("admin")])
+def secure_admin_action() -> str:
+ """Requires authentication AND the 'admin' scope."""
+ return "Secure admin action"
+```
+
+### Custom Auth Checks
+
+Any callable that accepts `AuthContext` and returns `bool` can serve as an auth check. This enables authorization logic based on token claims, component metadata, or external systems.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import AuthContext
+
+mcp = FastMCP("Custom Auth Server")
+
+def require_premium_user(ctx: AuthContext) -> bool:
+ """Check for premium user status in token claims."""
+ if ctx.token is None:
+ return False
+ return ctx.token.claims.get("premium", False) is True
+
+def require_access_level(minimum_level: int):
+ """Factory function for level-based authorization."""
+ def check(ctx: AuthContext) -> bool:
+ if ctx.token is None:
+ return False
+ user_level = ctx.token.claims.get("level", 0)
+ return user_level >= minimum_level
+ return check
+
+@mcp.tool(auth=require_premium_user)
+def premium_feature() -> str:
+ """Only for premium users."""
+ return "Premium content"
+
+@mcp.tool(auth=require_access_level(5))
+def advanced_feature() -> str:
+ """Requires access level 5 or higher."""
+ return "Advanced feature"
+```
+
+Auth checks can raise exceptions for explicit denial with custom messages:
+
+- **`AuthorizationError`**: Propagates with its custom message, useful for explaining why access was denied
+- **Other exceptions**: Masked for security (logged internally, treated as denial)
+
+```python
+from fastmcp.server.auth import AuthContext
+from fastmcp.exceptions import AuthorizationError
+
+def require_verified_email(ctx: AuthContext) -> bool:
+ """Require verified email with explicit denial message."""
+ if ctx.token is None:
+ raise AuthorizationError("Authentication required")
+ if not ctx.token.claims.get("email_verified"):
+ raise AuthorizationError("Email verification required")
+ return True
+```
+
+## Component-Level Authorization
+
+The `auth` parameter on decorators controls visibility of individual components. When auth checks fail for the current request, the component is hidden from list responses—it simply doesn't appear.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth, require_scopes
+
+mcp = FastMCP("Component Auth Server")
+
+@mcp.tool(auth=require_auth)
+def authenticated_tool() -> str:
+ """Only visible to authenticated users."""
+ return "Authenticated"
+
+@mcp.resource("secret://data", auth=require_scopes("read"))
+def secret_resource() -> str:
+ """Only visible to users with 'read' scope."""
+ return "Secret data"
+
+@mcp.prompt(auth=require_scopes("admin"))
+def admin_prompt() -> str:
+ """Only visible to users with 'admin' scope."""
+ return "Admin prompt content"
+```
+
+
+Component-level `auth` only controls visibility in list operations. It does not block direct access. Use `AuthMiddleware` to enforce authorization on execution.
+
+
+## Server-Level Authorization
+
+For server-wide authorization enforcement, use `AuthMiddleware`. This middleware applies auth checks globally to all components—filtering list responses and blocking unauthorized execution.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth
+from fastmcp.server.middleware import AuthMiddleware
+
+mcp = FastMCP(
+ "Enforced Auth Server",
+ middleware=[AuthMiddleware(auth=require_auth)]
+)
+
+@mcp.tool
+def any_tool() -> str:
+ """Requires authentication to see AND call."""
+ return "Protected"
+```
+
+### Filtering vs Enforcement
+
+| Behavior | Component-level `auth` | `AuthMiddleware` |
+|----------|------------------------|------------------|
+| Filters list responses | Yes | Yes |
+| Blocks execution | No | Yes (raises `AuthorizationError`) |
+
+Component-level auth is useful for hiding components from unauthorized users while still allowing advanced clients to access them directly. `AuthMiddleware` provides complete enforcement by raising `AuthorizationError` when unauthorized requests attempt execution.
+
+### Tag-Based Global Authorization
+
+A common pattern uses `restrict_tag` with `AuthMiddleware` to apply scope requirements based on component tags.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import restrict_tag
+from fastmcp.server.middleware import AuthMiddleware
+
+mcp = FastMCP(
+ "Tag-Based Auth Server",
+ middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"])),
+ AuthMiddleware(auth=restrict_tag("write", scopes=["write"])),
+ ]
+)
+
+@mcp.tool(tags={"admin"})
+def delete_all_data() -> str:
+ """Requires 'admin' scope."""
+ return "Deleted"
+
+@mcp.tool(tags={"write"})
+def update_record(id: str, data: str) -> str:
+ """Requires 'write' scope."""
+ return f"Updated {id}"
+
+@mcp.tool
+def read_record(id: str) -> str:
+ """No tag restrictions, accessible to all."""
+ return f"Record {id}"
+```
+
+## Accessing Tokens in Tools
+
+Tools can access the current authentication token using `get_access_token()` from `fastmcp.server.dependencies`. This enables tools to make decisions based on user identity or permissions beyond simple authorization checks.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.dependencies import get_access_token
+
+mcp = FastMCP("Token Access Server")
+
+@mcp.tool
+def personalized_greeting() -> str:
+ """Greet the user based on their token claims."""
+ token = get_access_token()
+
+ if token is None:
+ return "Hello, guest!"
+
+ name = token.claims.get("name", "user")
+ return f"Hello, {name}!"
+
+@mcp.tool
+def user_dashboard() -> dict:
+ """Return user-specific data based on token."""
+ token = get_access_token()
+
+ if token is None:
+ return {"error": "Not authenticated"}
+
+ return {
+ "client_id": token.client_id,
+ "scopes": token.scopes,
+ "claims": token.claims,
+ }
+```
+
+## Reference
+
+### AccessToken
+
+The `AccessToken` object contains information extracted from the OAuth token.
+
+| Property | Type | Description |
+|----------|------|-------------|
+| `token` | `str` | The raw token string |
+| `client_id` | `str \| None` | OAuth client identifier |
+| `scopes` | `list[str]` | Granted OAuth scopes |
+| `expires_at` | `datetime \| None` | Token expiration time |
+| `claims` | `dict[str, Any]` | All JWT claims or custom token data |
+
+### AuthContext
+
+The `AuthContext` dataclass is passed to all auth check functions.
+
+| Property | Type | Description |
+|----------|------|-------------|
+| `token` | `AccessToken \| None` | Current access token, or `None` if unauthenticated |
+| `component` | `Tool \| Resource \| Prompt` | The component being accessed |
+
+Access to the component object enables authorization decisions based on metadata like tags, name, or custom properties.
+
+```python
+from fastmcp.server.auth import AuthContext
+
+def require_matching_tag(ctx: AuthContext) -> bool:
+ """Require a scope matching each of the component's tags."""
+ if ctx.token is None:
+ return False
+ user_scopes = set(ctx.token.scopes)
+ return ctx.component.tags.issubset(user_scopes)
+```
+
+### Imports
+
+```python
+from fastmcp.server.auth import (
+ AccessToken, # Token with .token, .client_id, .scopes, .expires_at, .claims
+ AuthContext, # Context with .token, .component
+ AuthCheck, # Type alias: Callable[[AuthContext], bool]
+ require_auth, # Built-in: requires any valid token
+ require_scopes, # Built-in: requires specific scopes
+ restrict_tag, # Built-in: tag-based scope requirements
+ run_auth_checks, # Utility: run checks with AND logic
+)
+
+from fastmcp.server.middleware import AuthMiddleware
+```
diff --git a/docs/servers/context.mdx b/docs/servers/context.mdx
index cf365cd28a..3dc1b85e71 100644
--- a/docs/servers/context.mdx
+++ b/docs/servers/context.mdx
@@ -302,6 +302,8 @@ async def my_tool(ctx: Context) -> None:
### Transport
+
+
The `ctx.transport` property indicates which transport is being used to run the server. This is useful when your tool needs to behave differently depending on whether the server is running over STDIO, SSE, or Streamable HTTP. For example, you might want to return shorter responses over STDIO or adjust timeout behavior based on transport characteristics.
The transport type is set once when the server starts and remains constant for the server's lifetime. It returns `None` when called outside of a server context (for example, in unit tests or when running code outside of an MCP request).
diff --git a/docs/servers/lifespan.mdx b/docs/servers/lifespan.mdx
index 9171ed8929..45e1f775e9 100644
--- a/docs/servers/lifespan.mdx
+++ b/docs/servers/lifespan.mdx
@@ -1,6 +1,8 @@
---
title: Lifespans
description: Server-level setup and teardown with composable lifespans
+icon: heart-pulse
+tag: NEW
---
Lifespans let you run code once when the server starts and clean up when it stops. Unlike per-session handlers, lifespans run exactly once regardless of how many clients connect.
diff --git a/src/fastmcp/exceptions.py b/src/fastmcp/exceptions.py
index 4c1d6b590c..595961e3b7 100644
--- a/src/fastmcp/exceptions.py
+++ b/src/fastmcp/exceptions.py
@@ -37,3 +37,7 @@ class NotFoundError(Exception):
class DisabledError(Exception):
"""Object is disabled."""
+
+
+class AuthorizationError(FastMCPError):
+ """Error when authorization check fails."""
diff --git a/src/fastmcp/prompts/prompt.py b/src/fastmcp/prompts/prompt.py
index 7b2f9f4560..0b27bac384 100644
--- a/src/fastmcp/prompts/prompt.py
+++ b/src/fastmcp/prompts/prompt.py
@@ -32,6 +32,7 @@
without_injected_parameters,
)
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
+from fastmcp.tools.tool import AuthCheckCallable
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.json_schema import compress_schema
from fastmcp.utilities.logging import get_logger
@@ -200,6 +201,9 @@ class Prompt(FastMCPComponent):
arguments: list[PromptArgument] | None = Field(
default=None, description="Arguments that can be passed to the prompt"
)
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = Field(
+ default=None, description="Authorization checks for this prompt", exclude=True
+ )
def to_mcp_prompt(
self,
@@ -238,6 +242,7 @@ def from_function(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt:
"""Create a Prompt from a function.
@@ -255,6 +260,7 @@ def from_function(
tags=tags,
meta=meta,
task=task,
+ auth=auth,
)
async def render(
@@ -406,6 +412,7 @@ def from_function(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt:
"""Create a Prompt from a function.
@@ -504,6 +511,7 @@ def from_function(
fn=wrapped_fn,
meta=meta,
task_config=task_config,
+ auth=auth,
)
def _convert_string_arguments(self, kwargs: dict[str, Any]) -> dict[str, Any]:
@@ -635,6 +643,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
@@ -649,6 +658,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
@@ -662,6 +672,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionPrompt]
| FunctionPrompt
@@ -693,6 +704,7 @@ def prompt(
tags: Optional set of tags for categorizing the prompt
meta: Optional meta information about the prompt
task: Optional task configuration for background execution (default False)
+ auth: Optional authorization checks for the prompt
Returns:
A FunctionPrompt when decorating a function, or a decorator function when
@@ -748,6 +760,7 @@ def my_prompt(data: str) -> str:
tags=tags,
meta=meta,
task=supports_task,
+ auth=auth,
)
elif isinstance(name_or_fn, str):
@@ -776,4 +789,5 @@ def my_prompt(data: str) -> str:
tags=tags,
meta=meta,
task=task,
+ auth=auth,
)
diff --git a/src/fastmcp/resources/resource.py b/src/fastmcp/resources/resource.py
index 8d2c023c33..e39046338c 100644
--- a/src/fastmcp/resources/resource.py
+++ b/src/fastmcp/resources/resource.py
@@ -33,6 +33,7 @@
without_injected_parameters,
)
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
+from fastmcp.tools.tool import AuthCheckCallable
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.types import get_fn_name
@@ -229,6 +230,10 @@ class Resource(FastMCPComponent):
Annotations | None,
Field(description="Optional annotations about the resource's behavior"),
] = None
+ auth: Annotated[
+ AuthCheckCallable | list[AuthCheckCallable] | None,
+ Field(description="Authorization checks for this resource", exclude=True),
+ ] = None
@staticmethod
def from_function(
@@ -243,6 +248,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResource:
return FunctionResource.from_function(
fn=fn,
@@ -256,6 +262,7 @@ def from_function(
annotations=annotations,
meta=meta,
task=task,
+ auth=auth,
)
@field_validator("mime_type", mode="before")
@@ -431,6 +438,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResource:
"""Create a FunctionResource from a function."""
if isinstance(uri, str):
@@ -465,6 +473,7 @@ def from_function(
annotations=annotations,
meta=meta,
task_config=task_config,
+ auth=auth,
)
async def read(
@@ -510,6 +519,7 @@ def resource(
annotations: Annotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], Resource | ResourceTemplate]:
"""Standalone decorator to create a resource without registering it to a server.
@@ -536,6 +546,7 @@ def resource(
annotations: Optional annotations about the resource's behavior
meta: Optional meta information about the resource
task: Optional task configuration for background execution (default False)
+ auth: Optional authorization checks for the resource
Returns:
A decorator function that returns a Resource or ResourceTemplate.
@@ -608,6 +619,7 @@ def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
else:
return Resource.from_function(
@@ -622,6 +634,7 @@ def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
return decorator
diff --git a/src/fastmcp/resources/template.py b/src/fastmcp/resources/template.py
index 4f7c329288..97a26c9a5a 100644
--- a/src/fastmcp/resources/template.py
+++ b/src/fastmcp/resources/template.py
@@ -27,6 +27,7 @@
without_injected_parameters,
)
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
+from fastmcp.tools.tool import AuthCheckCallable
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.json_schema import compress_schema
from fastmcp.utilities.types import get_cached_typeadapter
@@ -114,6 +115,11 @@ class ResourceTemplate(FastMCPComponent):
annotations: Annotations | None = Field(
default=None, description="Optional annotations about the resource's behavior"
)
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = Field(
+ default=None,
+ description="Authorization checks for this resource template",
+ exclude=True,
+ )
def __repr__(self) -> str:
return f"{self.__class__.__name__}(uri_template={self.uri_template!r}, name={self.name!r}, description={self.description!r}, tags={self.tags})"
@@ -131,6 +137,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResourceTemplate:
return FunctionResourceTemplate.from_function(
fn=fn,
@@ -144,6 +151,7 @@ def from_function(
annotations=annotations,
meta=meta,
task=task,
+ auth=auth,
)
@field_validator("mime_type", mode="before")
@@ -370,6 +378,7 @@ async def resource_read_fn() -> str | bytes | ResourceResult:
mime_type=self.mime_type,
tags=self.tags,
task=self.task_config,
+ auth=self.auth,
)
async def read(self, arguments: dict[str, Any]) -> str | bytes | ResourceResult:
@@ -452,6 +461,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResourceTemplate:
"""Create a template from a function."""
@@ -562,4 +572,5 @@ def from_function(
annotations=annotations,
meta=meta,
task_config=task_config,
+ auth=auth,
)
diff --git a/src/fastmcp/server/auth/__init__.py b/src/fastmcp/server/auth/__init__.py
index 43f2f3f2e4..d8d221a3d2 100644
--- a/src/fastmcp/server/auth/__init__.py
+++ b/src/fastmcp/server/auth/__init__.py
@@ -5,6 +5,14 @@
AccessToken,
AuthProvider,
)
+from .authorization import (
+ AuthCheck,
+ AuthContext,
+ require_auth,
+ require_scopes,
+ restrict_tag,
+ run_auth_checks,
+)
from .providers.debug import DebugTokenVerifier
from .providers.jwt import JWTVerifier, StaticTokenVerifier
from .oauth_proxy import OAuthProxy
@@ -13,6 +21,8 @@
__all__ = [
"AccessToken",
+ "AuthCheck",
+ "AuthContext",
"AuthProvider",
"DebugTokenVerifier",
"JWTVerifier",
@@ -22,4 +32,8 @@
"RemoteAuthProvider",
"StaticTokenVerifier",
"TokenVerifier",
+ "require_auth",
+ "require_scopes",
+ "restrict_tag",
+ "run_auth_checks",
]
diff --git a/src/fastmcp/server/auth/authorization.py b/src/fastmcp/server/auth/authorization.py
new file mode 100644
index 0000000000..ae9e64a5b1
--- /dev/null
+++ b/src/fastmcp/server/auth/authorization.py
@@ -0,0 +1,190 @@
+"""Authorization checks for FastMCP components.
+
+This module provides callable-based authorization for tools, resources, and prompts.
+Auth checks are functions that receive an AuthContext and return True to allow access
+or False to deny.
+
+Auth checks can also raise exceptions:
+- AuthorizationError: Propagates with the custom message for explicit denial
+- Other exceptions: Masked for security (logged, treated as auth failure)
+
+Example:
+ ```python
+ from fastmcp import FastMCP
+ from fastmcp.server.auth import require_auth, require_scopes
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(): ...
+
+ @mcp.resource("data://secret", auth=require_scopes("read"))
+ def secret_data(): ...
+
+ @mcp.prompt(auth=require_auth)
+ def admin_prompt(): ...
+ ```
+"""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import Callable
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, cast
+
+from fastmcp.exceptions import AuthorizationError
+
+logger = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+ from fastmcp.server.auth import AccessToken
+ from fastmcp.tools.tool import Tool
+ from fastmcp.utilities.components import FastMCPComponent
+
+
+@dataclass
+class AuthContext:
+ """Context passed to auth check callables.
+
+ This object is passed to each auth check function and provides
+ access to the current authentication token and the component being accessed.
+
+ Attributes:
+ token: The current access token, or None if unauthenticated.
+ component: The component (tool, resource, or prompt) being accessed.
+ tool: Backwards-compatible alias for component when it's a Tool.
+ """
+
+ token: AccessToken | None
+ component: FastMCPComponent
+
+ @property
+ def tool(self) -> Tool | None:
+ """Backwards-compatible access to the component as a Tool.
+
+ Returns the component if it's a Tool, None otherwise.
+ """
+ from fastmcp.tools.tool import Tool
+
+ return self.component if isinstance(self.component, Tool) else None
+
+
+# Type alias for auth check functions
+AuthCheck = Callable[[AuthContext], bool]
+
+
+def require_auth(ctx: AuthContext) -> bool:
+ """Require any valid authentication.
+
+ Returns True if the request has a valid token, False otherwise.
+
+ Example:
+ ```python
+ @mcp.tool(auth=require_auth)
+ def protected_tool(): ...
+ ```
+ """
+ return ctx.token is not None
+
+
+def require_scopes(*scopes: str) -> AuthCheck:
+ """Require specific OAuth scopes.
+
+ Returns an auth check that requires ALL specified scopes to be present
+ in the token (AND logic).
+
+ Args:
+ *scopes: One or more scope strings that must all be present.
+
+ Example:
+ ```python
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool(): ...
+
+ @mcp.tool(auth=require_scopes("read", "write"))
+ def read_write_tool(): ...
+ ```
+ """
+ required = set(scopes)
+
+ def check(ctx: AuthContext) -> bool:
+ if ctx.token is None:
+ return False
+ return required.issubset(set(ctx.token.scopes))
+
+ return check
+
+
+def restrict_tag(tag: str, *, scopes: list[str]) -> AuthCheck:
+ """Restrict components with a specific tag to require certain scopes.
+
+ If the component has the specified tag, the token must have ALL the
+ required scopes. If the component doesn't have the tag, access is allowed.
+
+ Args:
+ tag: The tag that triggers the scope requirement.
+ scopes: List of scopes required when the tag is present.
+
+ Example:
+ ```python
+ # Components tagged "admin" require the "admin" scope
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+ ```
+ """
+ required = set(scopes)
+
+ def check(ctx: AuthContext) -> bool:
+ if tag not in ctx.component.tags:
+ return True # Tag not present, no restriction
+ if ctx.token is None:
+ return False
+ return required.issubset(set(ctx.token.scopes))
+
+ return check
+
+
+def run_auth_checks(
+ checks: AuthCheck | list[AuthCheck],
+ ctx: AuthContext,
+) -> bool:
+ """Run auth checks with AND logic.
+
+ All checks must pass for authorization to succeed.
+
+ Auth checks can:
+ - Return True to allow access
+ - Return False to deny access
+ - Raise AuthorizationError to deny with a custom message (propagates)
+ - Raise other exceptions (masked for security, treated as denial)
+
+ Args:
+ checks: A single check function or list of check functions.
+ ctx: The auth context to pass to each check.
+
+ Returns:
+ True if all checks pass, False if any check fails.
+
+ Raises:
+ AuthorizationError: If an auth check explicitly raises it.
+ """
+ check_list = [checks] if not isinstance(checks, list) else checks
+ check_list = cast(list[AuthCheck], check_list)
+
+ for check in check_list:
+ try:
+ if not check(ctx):
+ return False
+ except AuthorizationError:
+ # Let AuthorizationError propagate with its custom message
+ raise
+ except Exception:
+ # Mask other exceptions for security - log and treat as auth failure
+ logger.warning(
+ f"Auth check {getattr(check, '__name__', repr(check))} "
+ "raised an unexpected exception",
+ exc_info=True,
+ )
+ return False
+
+ return True
diff --git a/src/fastmcp/server/dependencies.py b/src/fastmcp/server/dependencies.py
index d2698670c0..28987431c8 100644
--- a/src/fastmcp/server/dependencies.py
+++ b/src/fastmcp/server/dependencies.py
@@ -417,7 +417,7 @@ def get_access_token() -> AccessToken | None:
# Optional fields
expires_at=access_token_as_dict.get("expires_at"),
resource=access_token_as_dict.get("resource"),
- claims=access_token_as_dict.get("claims"),
+ claims=access_token_as_dict.get("claims") or {},
)
except Exception as e:
raise TypeError(
diff --git a/src/fastmcp/server/middleware/__init__.py b/src/fastmcp/server/middleware/__init__.py
index 98afc1875f..8df6962bdb 100644
--- a/src/fastmcp/server/middleware/__init__.py
+++ b/src/fastmcp/server/middleware/__init__.py
@@ -1,3 +1,4 @@
+from .authorization import AuthMiddleware
from .middleware import (
CallNext,
Middleware,
@@ -6,6 +7,7 @@
from .ping import PingMiddleware
__all__ = [
+ "AuthMiddleware",
"CallNext",
"Middleware",
"MiddlewareContext",
diff --git a/src/fastmcp/server/middleware/authorization.py b/src/fastmcp/server/middleware/authorization.py
new file mode 100644
index 0000000000..f9f69ecd14
--- /dev/null
+++ b/src/fastmcp/server/middleware/authorization.py
@@ -0,0 +1,299 @@
+"""Authorization middleware for FastMCP.
+
+This module provides middleware-based authorization using callable auth checks.
+AuthMiddleware applies auth checks globally to all components on the server.
+
+Example:
+ ```python
+ from fastmcp import FastMCP
+ from fastmcp.server.auth import require_auth, require_scopes, restrict_tag
+ from fastmcp.server.middleware import AuthMiddleware
+
+ # Require auth for all components
+ mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=require_auth)
+ ])
+
+ # Tag-based: components tagged "admin" require "admin" scope
+ mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+ ])
+ ```
+"""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import Sequence
+
+import mcp.types as mt
+
+from fastmcp.exceptions import AuthorizationError, NotFoundError
+from fastmcp.prompts.prompt import Prompt, PromptResult
+from fastmcp.resources.resource import Resource, ResourceResult
+from fastmcp.resources.template import ResourceTemplate
+from fastmcp.server.auth.authorization import (
+ AuthCheck,
+ AuthContext,
+ run_auth_checks,
+)
+from fastmcp.server.dependencies import get_access_token
+from fastmcp.server.middleware.middleware import (
+ CallNext,
+ Middleware,
+ MiddlewareContext,
+)
+from fastmcp.tools.tool import Tool, ToolResult
+
+logger = logging.getLogger(__name__)
+
+
+class AuthMiddleware(Middleware):
+ """Global authorization middleware using callable checks.
+
+ This middleware applies auth checks to all components (tools, resources,
+ prompts) on the server. It uses the same callable API as component-level
+ auth checks.
+
+ The middleware:
+ - Filters tools/resources/prompts from list responses based on auth checks
+ - Checks auth before tool execution, resource read, and prompt render
+ - Skips all auth checks for STDIO transport (no OAuth concept)
+
+ Args:
+ auth: A single auth check function or list of check functions.
+ All checks must pass for authorization to succeed (AND logic).
+
+ Example:
+ ```python
+ from fastmcp import FastMCP
+ from fastmcp.server.auth import require_auth, require_scopes
+
+ # Require any authentication for all components
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ # Require specific scope for all components
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("api"))])
+
+ # Combined checks (AND logic)
+ mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=[require_auth, require_scopes("api")])
+ ])
+ ```
+ """
+
+ def __init__(self, auth: AuthCheck | list[AuthCheck]) -> None:
+ self.auth = auth
+
+ async def on_list_tools(
+ self,
+ context: MiddlewareContext[mt.ListToolsRequest],
+ call_next: CallNext[mt.ListToolsRequest, Sequence[Tool]],
+ ) -> Sequence[Tool]:
+ """Filter tools/list response based on auth checks."""
+ tools = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return tools
+
+ token = get_access_token()
+
+ authorized_tools: list[Tool] = []
+ for tool in tools:
+ ctx = AuthContext(token=token, component=tool)
+ if run_auth_checks(self.auth, ctx):
+ authorized_tools.append(tool)
+
+ return authorized_tools
+
+ async def on_call_tool(
+ self,
+ context: MiddlewareContext[mt.CallToolRequestParams],
+ call_next: CallNext[mt.CallToolRequestParams, ToolResult],
+ ) -> ToolResult:
+ """Check auth before tool execution."""
+ # STDIO has no auth concept, skip enforcement
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return await call_next(context)
+
+ # Get the tool being called
+ tool_name = context.message.name
+ fastmcp = context.fastmcp_context
+ if fastmcp is None:
+ # Fail closed: deny access when context is missing
+ logger.warning(
+ f"AuthMiddleware: fastmcp_context is None for tool '{tool_name}'. "
+ "Denying access for security."
+ )
+ raise AuthorizationError(
+ f"Authorization failed for tool '{tool_name}': missing context"
+ )
+
+ tool = await fastmcp.fastmcp.get_tool(tool_name)
+
+ token = get_access_token()
+ ctx = AuthContext(token=token, component=tool)
+
+ if not run_auth_checks(self.auth, ctx):
+ raise AuthorizationError(
+ f"Authorization failed for tool '{tool_name}': insufficient permissions"
+ )
+
+ return await call_next(context)
+
+ async def on_list_resources(
+ self,
+ context: MiddlewareContext[mt.ListResourcesRequest],
+ call_next: CallNext[mt.ListResourcesRequest, Sequence[Resource]],
+ ) -> Sequence[Resource]:
+ """Filter resources/list response based on auth checks."""
+ resources = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return resources
+
+ token = get_access_token()
+
+ authorized_resources: list[Resource] = []
+ for resource in resources:
+ ctx = AuthContext(token=token, component=resource)
+ if run_auth_checks(self.auth, ctx):
+ authorized_resources.append(resource)
+
+ return authorized_resources
+
+ async def on_read_resource(
+ self,
+ context: MiddlewareContext[mt.ReadResourceRequestParams],
+ call_next: CallNext[mt.ReadResourceRequestParams, ResourceResult],
+ ) -> ResourceResult:
+ """Check auth before resource read."""
+ # STDIO has no auth concept, skip enforcement
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return await call_next(context)
+
+ # Get the resource being read
+ uri = context.message.uri
+ fastmcp = context.fastmcp_context
+ if fastmcp is None:
+ logger.warning(
+ f"AuthMiddleware: fastmcp_context is None for resource '{uri}'. "
+ "Denying access for security."
+ )
+ raise AuthorizationError(
+ f"Authorization failed for resource '{uri}': missing context"
+ )
+
+ # Try concrete resource first, then template (for template-backed URIs)
+ try:
+ component = await fastmcp.fastmcp.get_resource(str(uri))
+ except NotFoundError:
+ component = await fastmcp.fastmcp.get_resource_template(str(uri))
+
+ token = get_access_token()
+ ctx = AuthContext(token=token, component=component)
+
+ if not run_auth_checks(self.auth, ctx):
+ raise AuthorizationError(
+ f"Authorization failed for resource '{uri}': insufficient permissions"
+ )
+
+ return await call_next(context)
+
+ async def on_list_resource_templates(
+ self,
+ context: MiddlewareContext[mt.ListResourceTemplatesRequest],
+ call_next: CallNext[
+ mt.ListResourceTemplatesRequest, Sequence[ResourceTemplate]
+ ],
+ ) -> Sequence[ResourceTemplate]:
+ """Filter resource templates/list response based on auth checks."""
+ templates = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return templates
+
+ token = get_access_token()
+
+ authorized_templates: list[ResourceTemplate] = []
+ for template in templates:
+ ctx = AuthContext(token=token, component=template)
+ if run_auth_checks(self.auth, ctx):
+ authorized_templates.append(template)
+
+ return authorized_templates
+
+ async def on_list_prompts(
+ self,
+ context: MiddlewareContext[mt.ListPromptsRequest],
+ call_next: CallNext[mt.ListPromptsRequest, Sequence[Prompt]],
+ ) -> Sequence[Prompt]:
+ """Filter prompts/list response based on auth checks."""
+ prompts = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return prompts
+
+ token = get_access_token()
+
+ authorized_prompts: list[Prompt] = []
+ for prompt in prompts:
+ ctx = AuthContext(token=token, component=prompt)
+ if run_auth_checks(self.auth, ctx):
+ authorized_prompts.append(prompt)
+
+ return authorized_prompts
+
+ async def on_get_prompt(
+ self,
+ context: MiddlewareContext[mt.GetPromptRequestParams],
+ call_next: CallNext[mt.GetPromptRequestParams, PromptResult],
+ ) -> PromptResult:
+ """Check auth before prompt render."""
+ # STDIO has no auth concept, skip enforcement
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return await call_next(context)
+
+ # Get the prompt being rendered
+ prompt_name = context.message.name
+ fastmcp = context.fastmcp_context
+ if fastmcp is None:
+ logger.warning(
+ f"AuthMiddleware: fastmcp_context is None for prompt '{prompt_name}'. "
+ "Denying access for security."
+ )
+ raise AuthorizationError(
+ f"Authorization failed for prompt '{prompt_name}': missing context"
+ )
+
+ prompt = await fastmcp.fastmcp.get_prompt(prompt_name)
+
+ token = get_access_token()
+ ctx = AuthContext(token=token, component=prompt)
+
+ if not run_auth_checks(self.auth, ctx):
+ raise AuthorizationError(
+ f"Authorization failed for prompt '{prompt_name}': insufficient permissions"
+ )
+
+ return await call_next(context)
diff --git a/src/fastmcp/server/providers/local_provider.py b/src/fastmcp/server/providers/local_provider.py
index 645d17543c..d29daab6ec 100644
--- a/src/fastmcp/server/providers/local_provider.py
+++ b/src/fastmcp/server/providers/local_provider.py
@@ -41,7 +41,7 @@ def greet(name: str) -> str:
from fastmcp.resources.template import ResourceTemplate
from fastmcp.server.providers.base import Provider
from fastmcp.server.tasks.config import TaskConfig
-from fastmcp.tools.tool import FunctionTool, Tool
+from fastmcp.tools.tool import AuthCheckCallable, FunctionTool, Tool
from fastmcp.tools.tool_transform import (
ToolTransformConfig,
apply_transformations_to_tools,
@@ -363,6 +363,7 @@ def tool(
enabled: bool = True,
task: bool | TaskConfig | None = None,
serializer: ToolResultSerializerType | None = None, # Deprecated
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool: ...
@overload
@@ -382,6 +383,7 @@ def tool(
enabled: bool = True,
task: bool | TaskConfig | None = None,
serializer: ToolResultSerializerType | None = None, # Deprecated
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
# NOTE: This method mirrors fastmcp.tools.tool() but adds registration,
@@ -404,6 +406,7 @@ def tool(
enabled: bool = True,
task: bool | TaskConfig | None = None,
serializer: ToolResultSerializerType | None = None, # Deprecated
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionTool]
| FunctionTool
@@ -496,6 +499,7 @@ def my_tool(x: int) -> str:
meta=meta,
serializer=serializer,
task=supports_task,
+ auth=auth,
)
self.add_tool(tool_obj)
# If disabled, add to blocklist
@@ -534,6 +538,7 @@ def my_tool(x: int) -> str:
enabled=enabled,
task=task,
serializer=serializer,
+ auth=auth,
)
def resource(
@@ -550,6 +555,7 @@ def resource(
annotations: Annotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], Resource | ResourceTemplate]:
"""Decorator to register a function as a resource.
@@ -568,6 +574,7 @@ def resource(
annotations: Optional annotations about the resource's behavior
meta: Optional meta information about the resource
task: Optional task configuration for background execution
+ auth: Optional authorization checks for the resource
Returns:
A decorator function.
@@ -600,6 +607,7 @@ def get_weather(city: str) -> str:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
@@ -630,6 +638,7 @@ def prompt(
enabled: bool = True,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt: ...
@overload
@@ -645,6 +654,7 @@ def prompt(
enabled: bool = True,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
def prompt(
@@ -659,6 +669,7 @@ def prompt(
enabled: bool = True,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionPrompt]
| FunctionPrompt
@@ -683,6 +694,7 @@ def prompt(
enabled: Whether the prompt is enabled (default True). If False, adds to blocklist.
meta: Optional meta information about the prompt
task: Optional task configuration for background execution
+ auth: Optional authorization checks for the prompt
Returns:
The registered FunctionPrompt or a decorator function.
@@ -723,6 +735,7 @@ def register(prompt_obj: FunctionPrompt) -> FunctionPrompt:
tags=tags,
meta=meta,
task=supports_task,
+ auth=auth,
)
# If standalone returned a FunctionPrompt directly (@prompt without parens),
diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py
index d1b48ae9a9..d160fbbdba 100644
--- a/src/fastmcp/server/server.py
+++ b/src/fastmcp/server/server.py
@@ -55,6 +55,7 @@
import fastmcp
import fastmcp.server
from fastmcp.exceptions import (
+ AuthorizationError,
DisabledError,
FastMCPError,
NotFoundError,
@@ -68,7 +69,8 @@
from fastmcp.prompts.prompt import FunctionPrompt, PromptResult
from fastmcp.resources.resource import Resource, ResourceResult
from fastmcp.resources.template import ResourceTemplate
-from fastmcp.server.auth import AuthProvider
+from fastmcp.server.auth import AuthContext, AuthProvider, run_auth_checks
+from fastmcp.server.dependencies import get_access_token
from fastmcp.server.event_store import EventStore
from fastmcp.server.http import (
StarletteWithLifespan,
@@ -83,7 +85,7 @@
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
from fastmcp.settings import DuplicateBehavior as DuplicateBehaviorSetting
from fastmcp.settings import Settings
-from fastmcp.tools.tool import FunctionTool, Tool, ToolResult
+from fastmcp.tools.tool import AuthCheckCallable, FunctionTool, Tool, ToolResult
from fastmcp.tools.tool_transform import ToolTransformConfig
from fastmcp.utilities.async_utils import gather
from fastmcp.utilities.cli import log_server_banner
@@ -156,6 +158,23 @@ def _resolve_on_duplicate(
]
+def _get_auth_context() -> tuple[bool, Any]:
+ """Get auth context for the current request.
+
+ Returns a tuple of (skip_auth, token) where:
+ - skip_auth=True means auth checks should be skipped (STDIO transport)
+ - token is the access token for HTTP transports (may be None if unauthenticated)
+
+ Uses late import to avoid circular import with context.py.
+ """
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ if is_stdio:
+ return (True, None)
+ return (False, get_access_token())
+
+
@asynccontextmanager
async def default_lifespan(server: FastMCP[LifespanResultT]) -> AsyncIterator[Any]:
"""Default lifespan context manager that does nothing.
@@ -859,6 +878,9 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
all_tools: dict[str, Tool] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -868,21 +890,34 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
raise result
continue
for tool in result:
- if self._is_component_enabled(tool) and tool.key not in all_tools:
- all_tools[tool.key] = tool
+ if not self._is_component_enabled(tool) or tool.key in all_tools:
+ continue
+ # Check tool-level auth (skip for STDIO)
+ if not skip_auth and tool.auth is not None:
+ ctx = AuthContext(token=token, component=tool)
+ try:
+ if not run_auth_checks(tool.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_tools[tool.key] = tool
return list(all_tools.values())
async def get_tool(self, name: str) -> Tool:
"""Get an enabled tool by name.
Queries all providers in parallel to find the tool.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_tool(name) for p in self._providers],
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -891,6 +926,11 @@ async def get_tool(self, name: str) -> Tool:
)
continue
if isinstance(result, Tool) and self._is_component_enabled(result):
+ # Check tool-level auth (skip for STDIO)
+ if not skip_auth and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown tool: {name!r}")
@@ -928,6 +968,9 @@ async def get_resources(self, *, run_middleware: bool = False) -> list[Resource]
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
all_resources: dict[str, Resource] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -937,24 +980,36 @@ async def get_resources(self, *, run_middleware: bool = False) -> list[Resource]
raise result
continue
for resource in result:
- if (
- self._is_component_enabled(resource)
- and resource.key not in all_resources
- ):
- all_resources[resource.key] = resource
+ if not self._is_component_enabled(resource):
+ continue
+ if resource.key in all_resources:
+ continue
+ # Check resource-level auth (skip for STDIO)
+ if not skip_auth and resource.auth is not None:
+ ctx = AuthContext(token=token, component=resource)
+ try:
+ if not run_auth_checks(resource.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_resources[resource.key] = resource
return list(all_resources.values())
async def get_resource(self, uri: str) -> Resource:
"""Get an enabled resource by URI.
Queries all providers in parallel to find the resource.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_resource(uri) for p in self._providers],
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -963,6 +1018,11 @@ async def get_resource(self, uri: str) -> Resource:
)
continue
if isinstance(result, Resource) and self._is_component_enabled(result):
+ # Check resource-level auth (skip for STDIO)
+ if not skip_auth and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown resource: {uri}")
@@ -1002,6 +1062,9 @@ async def get_resource_templates(
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
all_templates: dict[str, ResourceTemplate] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1013,24 +1076,36 @@ async def get_resource_templates(
raise result
continue
for template in result:
- if (
- self._is_component_enabled(template)
- and template.key not in all_templates
- ):
- all_templates[template.key] = template
+ if not self._is_component_enabled(template):
+ continue
+ if template.key in all_templates:
+ continue
+ # Check template-level auth (skip for STDIO)
+ if not skip_auth and template.auth is not None:
+ ctx = AuthContext(token=token, component=template)
+ try:
+ if not run_auth_checks(template.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_templates[template.key] = template
return list(all_templates.values())
async def get_resource_template(self, uri: str) -> ResourceTemplate:
"""Get an enabled resource template that matches the given URI.
Queries all providers in parallel to find the template.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_resource_template(uri) for p in self._providers],
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -1041,6 +1116,11 @@ async def get_resource_template(self, uri: str) -> ResourceTemplate:
if isinstance(result, ResourceTemplate) and self._is_component_enabled(
result
):
+ # Check template-level auth (skip for STDIO)
+ if not skip_auth and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown resource template: {uri}")
@@ -1078,6 +1158,9 @@ async def get_prompts(self, *, run_middleware: bool = False) -> list[Prompt]:
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
all_prompts: dict[str, Prompt] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1087,21 +1170,36 @@ async def get_prompts(self, *, run_middleware: bool = False) -> list[Prompt]:
raise result
continue
for prompt in result:
- if self._is_component_enabled(prompt) and prompt.key not in all_prompts:
- all_prompts[prompt.key] = prompt
+ if not self._is_component_enabled(prompt):
+ continue
+ if prompt.key in all_prompts:
+ continue
+ # Check prompt-level auth (skip for STDIO)
+ if not skip_auth and prompt.auth is not None:
+ ctx = AuthContext(token=token, component=prompt)
+ try:
+ if not run_auth_checks(prompt.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_prompts[prompt.key] = prompt
return list(all_prompts.values())
async def get_prompt(self, name: str) -> Prompt:
"""Get an enabled prompt by name.
Queries all providers in parallel to find the prompt.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_prompt(name) for p in self._providers],
return_exceptions=True,
)
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -1110,6 +1208,11 @@ async def get_prompt(self, name: str) -> Prompt:
)
continue
if isinstance(result, Prompt) and self._is_component_enabled(result):
+ # Check prompt-level auth (skip for STDIO)
+ if not skip_auth and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown prompt: {name}")
@@ -1756,6 +1859,7 @@ def tool(
exclude_args: list[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool: ...
@overload
@@ -1773,6 +1877,7 @@ def tool(
exclude_args: list[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
def tool(
@@ -1789,6 +1894,7 @@ def tool(
exclude_args: list[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionTool]
| FunctionTool
@@ -1856,6 +1962,7 @@ def my_tool(x: int) -> str:
meta=meta,
task=task if task is not None else self._support_tasks_by_default,
serializer=self._tool_serializer,
+ auth=auth,
)
return result
@@ -1895,6 +2002,7 @@ def resource(
annotations: Annotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], Resource | ResourceTemplate]:
"""Decorator to register a function as a resource.
@@ -1959,6 +2067,7 @@ async def get_weather(city: str) -> str:
annotations=annotations,
meta=meta,
task=task if task is not None else self._support_tasks_by_default,
+ auth=auth,
)
def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
@@ -1989,6 +2098,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt: ...
@overload
@@ -2003,6 +2113,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
def prompt(
@@ -2016,6 +2127,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionPrompt]
| FunctionPrompt
@@ -2099,6 +2211,7 @@ def another_prompt(data: str) -> list[Message]:
tags=tags,
meta=meta,
task=task if task is not None else self._support_tasks_by_default,
+ auth=auth,
)
async def run_stdio_async(
diff --git a/src/fastmcp/tools/tool.py b/src/fastmcp/tools/tool.py
index 576efefe28..c73555dafc 100644
--- a/src/fastmcp/tools/tool.py
+++ b/src/fastmcp/tools/tool.py
@@ -51,6 +51,10 @@
replace_type,
)
+# Runtime type alias for auth checks to avoid circular imports with authorization.py
+# AuthCheck is Callable[[AuthContext], bool] but we use Any to avoid the import
+AuthCheckCallable: TypeAlias = Callable[[Any], bool]
+
if TYPE_CHECKING:
from docket import Docket
from docket.execution import Execution
@@ -166,6 +170,10 @@ class Tool(FastMCPComponent):
description="Deprecated. Return ToolResult from your tools for full control over serialization."
),
] = None
+ auth: Annotated[
+ AuthCheckCallable | list[AuthCheckCallable] | None,
+ Field(description="Authorization checks for this tool", exclude=True),
+ ] = None
@model_validator(mode="after")
def _validate_tool_name(self) -> Tool:
@@ -215,6 +223,7 @@ def from_function(
serializer: ToolResultSerializerType | None = None, # Deprecated
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool:
"""Create a Tool from a function."""
return FunctionTool.from_function(
@@ -230,6 +239,7 @@ def from_function(
serializer=serializer,
meta=meta,
task=task,
+ auth=auth,
)
async def run(self, arguments: dict[str, Any]) -> ToolResult:
@@ -438,6 +448,7 @@ def from_function(
serializer: ToolResultSerializerType | None = None, # Deprecated
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool:
"""Create a Tool from a function."""
if serializer is not None and fastmcp.settings.deprecation_warnings:
@@ -500,6 +511,7 @@ def from_function(
serializer=serializer,
meta=meta,
task_config=task_config,
+ auth=auth,
)
async def run(self, arguments: dict[str, Any]) -> ToolResult:
@@ -793,6 +805,7 @@ def tool(
annotations: ToolAnnotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
@@ -809,6 +822,7 @@ def tool(
annotations: ToolAnnotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
@@ -824,6 +838,7 @@ def tool(
annotations: ToolAnnotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionTool]
| FunctionTool
@@ -917,6 +932,7 @@ def search(query: str) -> list[dict]:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
elif isinstance(name_or_fn, str):
@@ -947,4 +963,5 @@ def search(query: str) -> list[dict]:
annotations=annotations,
meta=meta,
task=task,
+ auth=auth,
)
diff --git a/src/fastmcp/tools/tool_transform.py b/src/fastmcp/tools/tool_transform.py
index b05f867689..b0700eea84 100644
--- a/src/fastmcp/tools/tool_transform.py
+++ b/src/fastmcp/tools/tool_transform.py
@@ -594,6 +594,7 @@ async def custom_output(**kwargs) -> ToolResult:
serializer=final_serializer,
meta=final_meta,
transform_args=transform_args,
+ auth=tool.auth,
)
return transformed_tool
diff --git a/tests/server/auth/test_authorization.py b/tests/server/auth/test_authorization.py
new file mode 100644
index 0000000000..55acd5e7b1
--- /dev/null
+++ b/tests/server/auth/test_authorization.py
@@ -0,0 +1,604 @@
+"""Tests for authorization checks and AuthMiddleware."""
+
+from unittest.mock import Mock
+
+import pytest
+from mcp.server.auth.middleware.auth_context import auth_context_var
+from mcp.server.auth.middleware.bearer_auth import AuthenticatedUser
+
+from fastmcp import FastMCP
+from fastmcp.client import Client
+from fastmcp.exceptions import NotFoundError
+from fastmcp.server.auth import (
+ AccessToken,
+ AuthContext,
+ require_auth,
+ require_scopes,
+ restrict_tag,
+ run_auth_checks,
+)
+from fastmcp.server.middleware import AuthMiddleware
+
+# =============================================================================
+# Test helpers
+# =============================================================================
+
+
+def make_token(scopes: list[str] | None = None) -> AccessToken:
+ """Create a test access token."""
+ return AccessToken(
+ token="test-token",
+ client_id="test-client",
+ scopes=scopes or [],
+ expires_at=None,
+ claims={},
+ )
+
+
+def make_tool() -> Mock:
+ """Create a mock tool for testing."""
+ tool = Mock()
+ tool.tags = set()
+ return tool
+
+
+# =============================================================================
+# Tests for require_auth
+# =============================================================================
+
+
+class TestRequireAuth:
+ def test_returns_true_with_token(self):
+ ctx = AuthContext(token=make_token(), component=make_tool())
+ assert require_auth(ctx) is True
+
+ def test_returns_false_without_token(self):
+ ctx = AuthContext(token=None, component=make_tool())
+ assert require_auth(ctx) is False
+
+
+# =============================================================================
+# Tests for require_scopes
+# =============================================================================
+
+
+class TestRequireScopes:
+ def test_returns_true_with_matching_scope(self):
+ token = make_token(scopes=["admin"])
+ ctx = AuthContext(token=token, component=make_tool())
+ check = require_scopes("admin")
+ assert check(ctx) is True
+
+ def test_returns_true_with_all_required_scopes(self):
+ token = make_token(scopes=["read", "write", "admin"])
+ ctx = AuthContext(token=token, component=make_tool())
+ check = require_scopes("read", "write")
+ assert check(ctx) is True
+
+ def test_returns_false_with_missing_scope(self):
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, component=make_tool())
+ check = require_scopes("admin")
+ assert check(ctx) is False
+
+ def test_returns_false_with_partial_scopes(self):
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, component=make_tool())
+ check = require_scopes("read", "write")
+ assert check(ctx) is False
+
+ def test_returns_false_without_token(self):
+ ctx = AuthContext(token=None, component=make_tool())
+ check = require_scopes("admin")
+ assert check(ctx) is False
+
+
+# =============================================================================
+# Tests for restrict_tag
+# =============================================================================
+
+
+class TestRestrictTag:
+ def test_allows_access_when_tag_not_present(self):
+ tool = make_tool()
+ tool.tags = {"other"}
+ ctx = AuthContext(token=None, component=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is True
+
+ def test_blocks_access_when_tag_present_without_token(self):
+ tool = make_tool()
+ tool.tags = {"admin"}
+ ctx = AuthContext(token=None, component=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is False
+
+ def test_blocks_access_when_tag_present_without_scope(self):
+ tool = make_tool()
+ tool.tags = {"admin"}
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, component=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is False
+
+ def test_allows_access_when_tag_present_with_scope(self):
+ tool = make_tool()
+ tool.tags = {"admin"}
+ token = make_token(scopes=["admin"])
+ ctx = AuthContext(token=token, component=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is True
+
+
+# =============================================================================
+# Tests for run_auth_checks
+# =============================================================================
+
+
+class TestRunAuthChecks:
+ def test_single_check_passes(self):
+ ctx = AuthContext(token=make_token(), component=make_tool())
+ assert run_auth_checks(require_auth, ctx) is True
+
+ def test_single_check_fails(self):
+ ctx = AuthContext(token=None, component=make_tool())
+ assert run_auth_checks(require_auth, ctx) is False
+
+ def test_multiple_checks_all_pass(self):
+ token = make_token(scopes=["admin"])
+ ctx = AuthContext(token=token, component=make_tool())
+ checks = [require_auth, require_scopes("admin")]
+ assert run_auth_checks(checks, ctx) is True
+
+ def test_multiple_checks_one_fails(self):
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, component=make_tool())
+ checks = [require_auth, require_scopes("admin")]
+ assert run_auth_checks(checks, ctx) is False
+
+ def test_empty_list_passes(self):
+ ctx = AuthContext(token=None, component=make_tool())
+ assert run_auth_checks([], ctx) is True
+
+ def test_custom_lambda_check(self):
+ token = make_token()
+ token.claims = {"level": 5}
+ ctx = AuthContext(token=token, component=make_tool())
+
+ def check(ctx: AuthContext) -> bool:
+ return ctx.token is not None and ctx.token.claims.get("level", 0) >= 3
+
+ assert run_auth_checks(check, ctx) is True
+
+ def test_authorization_error_propagates(self):
+ """AuthorizationError from auth check should propagate with custom message."""
+ from fastmcp.exceptions import AuthorizationError
+
+ def custom_auth_check(ctx: AuthContext) -> bool:
+ raise AuthorizationError("Custom denial reason")
+
+ ctx = AuthContext(token=make_token(), component=make_tool())
+ with pytest.raises(AuthorizationError, match="Custom denial reason"):
+ run_auth_checks(custom_auth_check, ctx)
+
+ def test_generic_exception_is_masked(self):
+ """Generic exceptions from auth checks should be masked (return False)."""
+
+ def buggy_auth_check(ctx: AuthContext) -> bool:
+ raise ValueError("Unexpected internal error")
+
+ ctx = AuthContext(token=make_token(), component=make_tool())
+ # Should return False, not raise the ValueError
+ assert run_auth_checks(buggy_auth_check, ctx) is False
+
+ def test_authorization_error_stops_chain(self):
+ """AuthorizationError should stop the check chain and propagate."""
+ from fastmcp.exceptions import AuthorizationError
+
+ call_order = []
+
+ def check_1(ctx: AuthContext) -> bool:
+ call_order.append(1)
+ return True
+
+ def check_2(ctx: AuthContext) -> bool:
+ call_order.append(2)
+ raise AuthorizationError("Explicit denial")
+
+ def check_3(ctx: AuthContext) -> bool:
+ call_order.append(3)
+ return True
+
+ ctx = AuthContext(token=make_token(), component=make_tool())
+ with pytest.raises(AuthorizationError, match="Explicit denial"):
+ run_auth_checks([check_1, check_2, check_3], ctx)
+
+ # Check 3 should not be called
+ assert call_order == [1, 2]
+
+
+# =============================================================================
+# Tests for tool-level auth with FastMCP
+# =============================================================================
+
+
+def set_token(token: AccessToken | None):
+ """Set the access token in the auth context var."""
+ if token is None:
+ return auth_context_var.set(None)
+ return auth_context_var.set(AuthenticatedUser(token))
+
+
+class TestToolLevelAuth:
+ async def test_tool_without_auth_is_visible(self):
+ mcp = FastMCP()
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "public_tool"
+
+ async def test_tool_with_auth_hidden_without_token(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ # No token set - tool should be hidden
+ tools = await mcp.get_tools()
+ assert len(tools) == 0
+
+ async def test_tool_with_auth_visible_with_token(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ # Set token in context
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "protected_tool"
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_tool_with_scope_auth_hidden_without_scope(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool() -> str:
+ return "admin"
+
+ # Token without admin scope
+ token = make_token(scopes=["read"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 0
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_tool_with_scope_auth_visible_with_scope(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool() -> str:
+ return "admin"
+
+ # Token with admin scope
+ token = make_token(scopes=["admin"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "admin_tool"
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_get_tool_returns_not_found_without_auth(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ with pytest.raises(NotFoundError):
+ await mcp.get_tool("protected_tool")
+
+ async def test_get_tool_returns_tool_with_auth(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tool = await mcp.get_tool("protected_tool")
+ assert tool.name == "protected_tool"
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Tests for AuthMiddleware
+# =============================================================================
+
+
+class TestAuthMiddleware:
+ async def test_middleware_filters_tools_without_token(self):
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ # No token - all tools filtered by middleware
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 0
+
+ async def test_middleware_allows_tools_with_token(self):
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 1
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_middleware_with_scope_check(self):
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("api"))])
+
+ @mcp.tool
+ def api_tool() -> str:
+ return "api"
+
+ # Token without api scope
+ token = make_token(scopes=["read"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 0
+ finally:
+ auth_context_var.reset(tok)
+
+ # Token with api scope
+ token = make_token(scopes=["api"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 1
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_middleware_with_restrict_tag(self):
+ mcp = FastMCP(
+ middleware=[AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))]
+ )
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ @mcp.tool(tags={"admin"})
+ def admin_tool() -> str:
+ return "admin"
+
+ # No token - public tool allowed, admin tool blocked
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 1
+ assert tools[0].name == "public_tool"
+
+ # Token with admin scope - both allowed
+ token = make_token(scopes=["admin"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 2
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Integration tests with Client
+# =============================================================================
+
+
+class TestAuthIntegration:
+ async def test_client_only_sees_authorized_tools(self):
+ mcp = FastMCP()
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ async with Client(mcp) as client:
+ # No token - only public tool visible
+ tools = await client.list_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "public_tool"
+
+ async def test_client_with_token_sees_all_authorized_tools(self):
+ mcp = FastMCP()
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ # Set token before creating client
+ token = make_token()
+ tok = set_token(token)
+ try:
+ async with Client(mcp) as client:
+ tools = await client.list_tools()
+ tool_names = [t.name for t in tools]
+ # With token, both tools should be visible
+ assert "public_tool" in tool_names
+ assert "protected_tool" in tool_names
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Tests for transformed tools preserving auth
+# =============================================================================
+
+
+class TestTransformedToolAuth:
+ async def test_transformed_tool_preserves_auth(self):
+ """Transformed tools should inherit auth from parent."""
+ from fastmcp.tools.tool_transform import TransformedTool
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(x: int) -> str:
+ return str(x)
+
+ # Get the tool and transform it
+ tools = await mcp._local_provider.list_tools()
+ original_tool = tools[0]
+ assert original_tool.auth is not None
+
+ # Transform the tool
+ transformed = TransformedTool.from_tool(
+ original_tool,
+ name="transformed_protected",
+ )
+
+ # Auth should be preserved
+ assert transformed.auth is not None
+ assert transformed.auth == original_tool.auth
+
+ async def test_transformed_tool_filtered_without_token(self):
+ """Transformed tools with auth should be filtered without token."""
+ from fastmcp.tools.tool_transform import ToolTransformConfig
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(x: int) -> str:
+ return str(x)
+
+ # Add transformation
+ mcp.add_tool_transformation(
+ "protected_tool", ToolTransformConfig(name="renamed_protected")
+ )
+
+ # Without token, transformed tool should not be visible
+ tools = await mcp.get_tools()
+ assert len(tools) == 0
+
+ async def test_transformed_tool_visible_with_token(self):
+ """Transformed tools with auth should be visible with token."""
+ from fastmcp.tools.tool_transform import ToolTransformConfig
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(x: int) -> str:
+ return str(x)
+
+ # Add transformation
+ mcp.add_tool_transformation(
+ "protected_tool", ToolTransformConfig(name="renamed_protected")
+ )
+
+ # With token, transformed tool should be visible
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "renamed_protected"
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Tests for AuthMiddleware on_call_tool enforcement
+# =============================================================================
+
+
+class TestAuthMiddlewareCallTool:
+ async def test_middleware_blocks_call_without_auth(self):
+ """AuthMiddleware should raise AuthorizationError on unauthorized call."""
+
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def my_tool() -> str:
+ return "result"
+
+ # Without token, calling the tool should raise AuthorizationError
+ async with Client(mcp) as client:
+ with pytest.raises(Exception) as exc_info:
+ await client.call_tool("my_tool", {})
+ # The error message should indicate authorization failure
+ assert (
+ "authorization" in str(exc_info.value).lower()
+ or "insufficient" in str(exc_info.value).lower()
+ )
+
+ async def test_middleware_allows_call_with_auth(self):
+ """AuthMiddleware should allow tool call with valid token."""
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def my_tool() -> str:
+ return "result"
+
+ # With token, calling the tool should succeed
+ token = make_token()
+ tok = set_token(token)
+ try:
+ async with Client(mcp) as client:
+ result = await client.call_tool("my_tool", {})
+ assert result.content[0].text == "result"
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_middleware_blocks_call_with_wrong_scope(self):
+ """AuthMiddleware should block calls when scope requirements aren't met."""
+
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("admin"))])
+
+ @mcp.tool
+ def admin_tool() -> str:
+ return "admin result"
+
+ # With token that lacks admin scope
+ token = make_token(scopes=["read"])
+ tok = set_token(token)
+ try:
+ async with Client(mcp) as client:
+ with pytest.raises(Exception) as exc_info:
+ await client.call_tool("admin_tool", {})
+ assert (
+ "authorization" in str(exc_info.value).lower()
+ or "insufficient" in str(exc_info.value).lower()
+ )
+ finally:
+ auth_context_var.reset(tok)
diff --git a/uv.lock b/uv.lock
index 600f0392f4..30cbf4e213 100644
--- a/uv.lock
+++ b/uv.lock
@@ -723,6 +723,7 @@ dev = [
{ name = "inline-snapshot", extra = ["dirty-equals"] },
{ name = "ipython", version = "8.37.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
{ name = "ipython", version = "9.8.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" },
+ { name = "loq" },
{ name = "pdbpp" },
{ name = "prek" },
{ name = "psutil" },
@@ -774,6 +775,7 @@ dev = [
{ name = "fastmcp", extras = ["anthropic", "openai", "tasks"] },
{ name = "inline-snapshot", extras = ["dirty-equals"], specifier = ">=0.27.2" },
{ name = "ipython", specifier = ">=8.12.3" },
+ { name = "loq", specifier = ">=0.1.0a3" },
{ name = "pdbpp", specifier = ">=0.11.7" },
{ name = "prek", specifier = ">=0.2.12" },
{ name = "psutil", specifier = ">=7.0.0" },
@@ -1175,6 +1177,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" },
]
+[[package]]
+name = "loq"
+version = "0.1.0a3"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/87/2d/911c0ca3175aa30358eb0b1dd4789e3695292db63dc50da34be0d8aa001a/loq-0.1.0a3.tar.gz", hash = "sha256:8f34ed32eed79d3257fc6c6e2ffc4970795bae0fbd7dee95cdb027d351a87839", size = 49278, upload-time = "2026-01-12T05:13:34.14Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/6a/7c/d028a1041caef85844b7a68926138daedac6ab4aa720f6cb8c77a30dcb49/loq-0.1.0a3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e65cb220446f1a2fad5c0e3e0c97715204652cb3896b35bb50967745d1f74ade", size = 1285120, upload-time = "2026-01-12T05:13:29.418Z" },
+ { url = "https://files.pythonhosted.org/packages/91/49/cfa207080940c9d447ca4633d7a25790d50e936de29d154dcc1941c45e54/loq-0.1.0a3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:20afc0638df897312ea33f064cd8d05ee437c77ff6fca890ac33d41cc5fe9dbd", size = 1212512, upload-time = "2026-01-12T05:13:26.353Z" },
+ { url = "https://files.pythonhosted.org/packages/80/d9/68f2418009771b764fc6fcbd379892464c7d96cf668ea570f8b3d6e93806/loq-0.1.0a3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:efb606799326bdba9e0931ae6664076eb24ea9ce29f540ec62071f3da7d89753", size = 1232624, upload-time = "2026-01-12T05:13:30.431Z" },
+ { url = "https://files.pythonhosted.org/packages/cc/27/10e195de9e4e843fed4a098f4a7b3e2b78a0218d317adb3ea83b341fa8e9/loq-0.1.0a3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:63a146821abc50f9166fdec83a92330ddec81d78fc02d0956cf00957fc02f4e6", size = 1324994, upload-time = "2026-01-12T05:13:27.895Z" },
+ { url = "https://files.pythonhosted.org/packages/98/09/76677afb5f9d380a89cf59489f03cdb7c1c6318d1c81c6fc3073cce93893/loq-0.1.0a3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:afc46f65303b9ff64ae0e517f4834cf0e5b97cd5b375cbfd66dadf267f94f725", size = 1272635, upload-time = "2026-01-12T05:13:23.759Z" },
+ { url = "https://files.pythonhosted.org/packages/65/77/fda09acd73fc44f6a7cfa9851dd8d8fdab0829311b9c7fa1f1af30b3affa/loq-0.1.0a3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:3e5501abcbab67d14e6c71a6c010609430a9495a888e10047d5542ffd46acd65", size = 1389293, upload-time = "2026-01-12T05:13:32.655Z" },
+ { url = "https://files.pythonhosted.org/packages/39/68/2dbb5e29e0333f036b92b09f6b54bbd8e68b51b780051f01e15c307b6ab1/loq-0.1.0a3-py3-none-win_amd64.whl", hash = "sha256:51cd849b94f09d211bb842809cce4172ee0904092be2643bdb9629001745f3b9", size = 1283650, upload-time = "2026-01-12T05:13:25.042Z" },
+]
+
[[package]]
name = "lupa"
version = "2.6"