From a17a4142d8f032cee80c7b57ff95dd0dfe931709 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 16:59:47 -0500
Subject: [PATCH 1/8] Add callable-based tool authorization with STDIO bypass
Tools can now specify auth checks via the `auth` parameter:
```python
@mcp.tool(auth=require_auth)
def protected_tool(): ...
@mcp.tool(auth=require_scopes("admin"))
def admin_tool(): ...
```
Auth checks filter tools from list_tools and enforce at call_tool time.
STDIO transport bypasses all auth (no OAuth concept in local transport).
AuthMiddleware provides server-wide enforcement.
---
docs/servers/auth/authorization.mdx | 382 +++++++++++
src/fastmcp/exceptions.py | 4 +
src/fastmcp/server/auth/__init__.py | 14 +
src/fastmcp/server/auth/authorization.py | 177 ++++++
src/fastmcp/server/middleware/__init__.py | 2 +
.../server/middleware/authorization.py | 144 +++++
.../server/providers/local_provider.py | 7 +-
src/fastmcp/server/server.py | 41 +-
src/fastmcp/tools/tool.py | 17 +
src/fastmcp/tools/tool_transform.py | 1 +
tests/server/auth/test_authorization.py | 598 ++++++++++++++++++
uv.lock | 17 +
12 files changed, 1398 insertions(+), 6 deletions(-)
create mode 100644 docs/servers/auth/authorization.mdx
create mode 100644 src/fastmcp/server/auth/authorization.py
create mode 100644 src/fastmcp/server/middleware/authorization.py
create mode 100644 tests/server/auth/test_authorization.py
diff --git a/docs/servers/auth/authorization.mdx b/docs/servers/auth/authorization.mdx
new file mode 100644
index 0000000000..e8f0929c35
--- /dev/null
+++ b/docs/servers/auth/authorization.mdx
@@ -0,0 +1,382 @@
+---
+title: Authorization
+sidebarTitle: Authorization
+description: Control access to tools using callable-based authorization checks that filter visibility and enforce permissions.
+icon: shield-halved
+---
+
+import { VersionBadge } from "/snippets/version-badge.mdx"
+
+
+
+Authorization controls what authenticated users can do with your FastMCP server. While [authentication](/servers/auth/authentication) verifies identity (who you are), authorization determines access (what you can do). FastMCP provides a callable-based authorization system that works at both the tool level and globally via middleware.
+
+The authorization model centers on a simple concept: callable functions that receive context about the current request and return `True` to allow access or `False` to deny it. Multiple checks combine with AND logic, meaning all checks must pass for access to be granted.
+
+
+Authorization relies on OAuth tokens which are only available with HTTP transports (SSE, Streamable HTTP). In STDIO mode, there's no OAuth mechanism, so `get_access_token()` returns `None`. Tools with auth checks like `require_auth` will be hidden from STDIO clients.
+
+
+## Auth Checks
+
+An auth check is any callable that accepts an `AuthContext` and returns a boolean. The `AuthContext` provides access to the current token (if any) and the tool being accessed.
+
+```python
+from fastmcp.server.auth import AuthContext
+
+def my_custom_check(ctx: AuthContext) -> bool:
+ # ctx.token is AccessToken | None
+ # ctx.tool is the Tool being accessed
+ return ctx.token is not None and "special" in ctx.token.scopes
+```
+
+FastMCP provides three built-in auth checks that cover common authorization patterns.
+
+### require_auth
+
+The simplest check verifies that any valid authentication token is present. Unauthenticated requests are denied.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth
+
+mcp = FastMCP("Protected Server")
+
+@mcp.tool(auth=require_auth)
+def protected_operation() -> str:
+ """Only accessible to authenticated users."""
+ return "Success"
+```
+
+### require_scopes
+
+For scope-based authorization, `require_scopes` checks that the token contains all specified OAuth scopes. When multiple scopes are provided, all must be present (AND logic).
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_scopes
+
+mcp = FastMCP("Scoped Server")
+
+@mcp.tool(auth=require_scopes("admin"))
+def admin_operation() -> str:
+ """Requires the 'admin' scope."""
+ return "Admin action completed"
+
+@mcp.tool(auth=require_scopes("read", "write"))
+def read_write_operation() -> str:
+ """Requires both 'read' AND 'write' scopes."""
+ return "Read/write action completed"
+```
+
+### restrict_tag
+
+Tag-based restrictions apply scope requirements conditionally. If a tool has the specified tag, the token must have the required scopes. Tools without the tag are unaffected.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import restrict_tag
+from fastmcp.server.middleware import AuthMiddleware
+
+mcp = FastMCP(
+ "Tagged Server",
+ middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+ ]
+)
+
+@mcp.tool(tags={"admin"})
+def admin_tool() -> str:
+ """Tagged 'admin', so requires 'admin' scope."""
+ return "Admin only"
+
+@mcp.tool(tags={"public"})
+def public_tool() -> str:
+ """Not tagged 'admin', so no scope required by the restriction."""
+ return "Anyone can access"
+```
+
+## Combining Checks
+
+Multiple auth checks can be combined by passing a list. All checks must pass for authorization to succeed (AND logic).
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth, require_scopes
+
+mcp = FastMCP("Combined Auth Server")
+
+@mcp.tool(auth=[require_auth, require_scopes("admin")])
+def secure_admin_action() -> str:
+ """Requires authentication AND the 'admin' scope."""
+ return "Secure admin action"
+```
+
+## Custom Auth Checks
+
+Any callable that accepts `AuthContext` and returns `bool` can serve as an auth check. This enables authorization logic based on token claims, tool metadata, or external systems.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import AuthContext
+
+mcp = FastMCP("Custom Auth Server")
+
+def require_premium_user(ctx: AuthContext) -> bool:
+ """Check for premium user status in token claims."""
+ if ctx.token is None:
+ return False
+ return ctx.token.claims.get("premium", False) is True
+
+def require_access_level(minimum_level: int):
+ """Factory function for level-based authorization."""
+ def check(ctx: AuthContext) -> bool:
+ if ctx.token is None:
+ return False
+ user_level = ctx.token.claims.get("level", 0)
+ return user_level >= minimum_level
+ return check
+
+@mcp.tool(auth=require_premium_user)
+def premium_feature() -> str:
+ """Only for premium users."""
+ return "Premium content"
+
+@mcp.tool(auth=require_access_level(5))
+def advanced_feature() -> str:
+ """Requires access level 5 or higher."""
+ return "Advanced feature"
+```
+
+Lambda functions work for simple inline checks.
+
+```python
+from fastmcp import FastMCP
+
+mcp = FastMCP("Lambda Auth Server")
+
+@mcp.tool(auth=lambda ctx: ctx.token and ctx.token.claims.get("level", 0) >= 5)
+def level_restricted_tool() -> str:
+ """Requires level 5+ in token claims."""
+ return "High level access granted"
+```
+
+### Exception Handling in Auth Checks
+
+Auth checks can raise exceptions for explicit denial with custom messages:
+
+- **`AuthorizationError`**: Propagates with its custom message, useful for explaining why access was denied
+- **Other exceptions**: Masked for security (logged internally, treated as denial)
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import AuthContext
+from fastmcp.exceptions import AuthorizationError
+
+mcp = FastMCP("Exception Auth Server")
+
+def require_verified_email(ctx: AuthContext) -> bool:
+ """Require verified email with explicit denial message."""
+ if ctx.token is None:
+ raise AuthorizationError("Authentication required")
+ if not ctx.token.claims.get("email_verified"):
+ raise AuthorizationError("Email verification required to access this tool")
+ return True
+
+@mcp.tool(auth=require_verified_email)
+def sensitive_operation() -> str:
+ """Only accessible to users with verified email."""
+ return "Operation completed"
+```
+
+This pattern is useful when you want to provide actionable feedback to clients about why authorization failed.
+
+## Tool-Level Authorization
+
+The `auth` parameter on `@mcp.tool` controls visibility of individual tools. When a tool has auth checks that fail for the current request, it is hidden from `list_tools` responses. The tool simply doesn't appear in the available tools list.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth, require_scopes
+
+mcp = FastMCP("Tool Auth Server")
+
+@mcp.tool
+def public_tool() -> str:
+ """Visible to everyone."""
+ return "Public"
+
+@mcp.tool(auth=require_auth)
+def authenticated_tool() -> str:
+ """Only visible to authenticated users."""
+ return "Authenticated"
+
+@mcp.tool(auth=require_scopes("admin"))
+def admin_tool() -> str:
+ """Only visible to users with 'admin' scope."""
+ return "Admin"
+```
+
+An unauthenticated client sees only `public_tool`. An authenticated client without the "admin" scope sees `public_tool` and `authenticated_tool`. An authenticated client with the "admin" scope sees all three tools.
+
+
+Tool-level `auth` only controls visibility in `list_tools`. It does not block direct tool calls. If a client knows a tool name, they can attempt to call it directly. Use `AuthMiddleware` to enforce authorization on tool execution.
+
+
+## AuthMiddleware
+
+For server-wide authorization enforcement, use `AuthMiddleware`. This middleware applies auth checks globally to all tools and provides two critical functions: filtering `list_tools` responses and enforcing authorization on `call_tool` requests.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth
+from fastmcp.server.middleware import AuthMiddleware
+
+mcp = FastMCP(
+ "Enforced Auth Server",
+ middleware=[AuthMiddleware(auth=require_auth)]
+)
+
+@mcp.tool
+def any_tool() -> str:
+ """Requires authentication to see AND call."""
+ return "Protected"
+```
+
+### Filtering vs Enforcement
+
+Understanding the difference between tool-level auth and `AuthMiddleware` is important for building secure servers:
+
+| Behavior | Tool-level `auth` | `AuthMiddleware` |
+|----------|-------------------|------------------|
+| Filters `list_tools` | Yes | Yes |
+| Blocks `call_tool` | No | Yes (raises `AuthorizationError`) |
+
+Tool-level auth is useful for hiding tools from unauthorized users while still allowing advanced clients to call them if needed. `AuthMiddleware` provides complete enforcement by raising `AuthorizationError` when unauthorized requests attempt to execute tools.
+
+### Tag-Based Global Authorization
+
+A common pattern uses `restrict_tag` with `AuthMiddleware` to apply scope requirements based on tool tags.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import restrict_tag
+from fastmcp.server.middleware import AuthMiddleware
+
+mcp = FastMCP(
+ "Tag-Based Auth Server",
+ middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"])),
+ AuthMiddleware(auth=restrict_tag("write", scopes=["write"])),
+ ]
+)
+
+@mcp.tool(tags={"admin"})
+def delete_all_data() -> str:
+ """Requires 'admin' scope."""
+ return "Deleted"
+
+@mcp.tool(tags={"write"})
+def update_record(id: str, data: str) -> str:
+ """Requires 'write' scope."""
+ return f"Updated {id}"
+
+@mcp.tool
+def read_record(id: str) -> str:
+ """No tag restrictions, accessible to all authenticated users."""
+ return f"Record {id}"
+```
+
+## Accessing Tokens in Tools
+
+Tools can access the current authentication token using `get_access_token()` from `fastmcp.server.dependencies`. This enables tools to make decisions based on user identity or permissions beyond simple authorization checks.
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.dependencies import get_access_token
+
+mcp = FastMCP("Token Access Server")
+
+@mcp.tool
+def personalized_greeting() -> str:
+ """Greet the user based on their token claims."""
+ token = get_access_token()
+
+ if token is None:
+ return "Hello, guest!"
+
+ name = token.claims.get("name", "user")
+ return f"Hello, {name}!"
+
+@mcp.tool
+def user_dashboard() -> dict:
+ """Return user-specific data based on token."""
+ token = get_access_token()
+
+ if token is None:
+ return {"error": "Not authenticated"}
+
+ return {
+ "client_id": token.client_id,
+ "scopes": token.scopes,
+ "claims": token.claims,
+ }
+```
+
+## AccessToken
+
+The `AccessToken` object contains information extracted from the OAuth token.
+
+| Property | Type | Description |
+|----------|------|-------------|
+| `token` | `str` | The raw token string |
+| `client_id` | `str \| None` | OAuth client identifier |
+| `scopes` | `list[str]` | Granted OAuth scopes |
+| `expires_at` | `datetime \| None` | Token expiration time |
+| `claims` | `dict[str, Any]` | All JWT claims or custom token data |
+
+The `claims` dictionary contains the full decoded token payload for JWT tokens, enabling authorization decisions based on any claim present in the token.
+
+## AuthContext
+
+The `AuthContext` dataclass is passed to all auth check functions.
+
+| Property | Type | Description |
+|----------|------|-------------|
+| `token` | `AccessToken \| None` | Current access token, or `None` if unauthenticated |
+| `tool` | `Tool` | The tool being accessed |
+
+Access to the tool object enables authorization decisions based on tool metadata like tags, name, or custom properties.
+
+```python
+from fastmcp.server.auth import AuthContext
+
+def require_matching_tag(ctx: AuthContext) -> bool:
+ """Require a scope matching each of the tool's tags."""
+ if ctx.token is None:
+ return False
+ user_scopes = set(ctx.token.scopes)
+ return ctx.tool.tags.issubset(user_scopes)
+```
+
+## Type Reference
+
+All authorization types are exported from `fastmcp.server.auth`:
+
+```python
+from fastmcp.server.auth import (
+ AccessToken, # Token with .token, .client_id, .scopes, .expires_at, .claims
+ AuthContext, # Context with .token, .tool
+ AuthCheck, # Type alias: Callable[[AuthContext], bool]
+ require_auth, # Built-in: requires any valid token
+ require_scopes, # Built-in: requires specific scopes
+ restrict_tag, # Built-in: tag-based scope requirements
+ run_auth_checks, # Utility: run checks with AND logic
+)
+```
+
+The `AuthMiddleware` is exported from `fastmcp.server.middleware`:
+
+```python
+from fastmcp.server.middleware import AuthMiddleware
+```
diff --git a/src/fastmcp/exceptions.py b/src/fastmcp/exceptions.py
index 4c1d6b590c..595961e3b7 100644
--- a/src/fastmcp/exceptions.py
+++ b/src/fastmcp/exceptions.py
@@ -37,3 +37,7 @@ class NotFoundError(Exception):
class DisabledError(Exception):
"""Object is disabled."""
+
+
+class AuthorizationError(FastMCPError):
+ """Error when authorization check fails."""
diff --git a/src/fastmcp/server/auth/__init__.py b/src/fastmcp/server/auth/__init__.py
index 43f2f3f2e4..d8d221a3d2 100644
--- a/src/fastmcp/server/auth/__init__.py
+++ b/src/fastmcp/server/auth/__init__.py
@@ -5,6 +5,14 @@
AccessToken,
AuthProvider,
)
+from .authorization import (
+ AuthCheck,
+ AuthContext,
+ require_auth,
+ require_scopes,
+ restrict_tag,
+ run_auth_checks,
+)
from .providers.debug import DebugTokenVerifier
from .providers.jwt import JWTVerifier, StaticTokenVerifier
from .oauth_proxy import OAuthProxy
@@ -13,6 +21,8 @@
__all__ = [
"AccessToken",
+ "AuthCheck",
+ "AuthContext",
"AuthProvider",
"DebugTokenVerifier",
"JWTVerifier",
@@ -22,4 +32,8 @@
"RemoteAuthProvider",
"StaticTokenVerifier",
"TokenVerifier",
+ "require_auth",
+ "require_scopes",
+ "restrict_tag",
+ "run_auth_checks",
]
diff --git a/src/fastmcp/server/auth/authorization.py b/src/fastmcp/server/auth/authorization.py
new file mode 100644
index 0000000000..1ae599260c
--- /dev/null
+++ b/src/fastmcp/server/auth/authorization.py
@@ -0,0 +1,177 @@
+"""Authorization checks for FastMCP tools.
+
+This module provides callable-based authorization for tools. Auth checks
+are functions that receive an AuthContext and return True to allow access
+or False to deny.
+
+Auth checks can also raise exceptions:
+- AuthorizationError: Propagates with the custom message for explicit denial
+- Other exceptions: Masked for security (logged, treated as auth failure)
+
+Example:
+ ```python
+ from fastmcp import FastMCP
+ from fastmcp.server.auth import require_auth, require_scopes
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(): ...
+
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool(): ...
+
+ @mcp.tool(auth=[require_auth, require_scopes("write")])
+ def write_tool(): ...
+ ```
+"""
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Callable, cast
+
+from fastmcp.exceptions import AuthorizationError
+
+logger = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+ from fastmcp.server.auth import AccessToken
+ from fastmcp.tools.tool import Tool
+
+
+@dataclass
+class AuthContext:
+ """Context passed to auth check callables.
+
+ This object is passed to each auth check function and provides
+ access to the current authentication token and the tool being accessed.
+
+ Attributes:
+ token: The current access token, or None if unauthenticated.
+ tool: The tool being accessed.
+ """
+
+ token: AccessToken | None
+ tool: Tool
+
+
+# Type alias for auth check functions
+AuthCheck = Callable[[AuthContext], bool]
+
+
+def require_auth(ctx: AuthContext) -> bool:
+ """Require any valid authentication.
+
+ Returns True if the request has a valid token, False otherwise.
+
+ Example:
+ ```python
+ @mcp.tool(auth=require_auth)
+ def protected_tool(): ...
+ ```
+ """
+ return ctx.token is not None
+
+
+def require_scopes(*scopes: str) -> AuthCheck:
+ """Require specific OAuth scopes.
+
+ Returns an auth check that requires ALL specified scopes to be present
+ in the token (AND logic).
+
+ Args:
+ *scopes: One or more scope strings that must all be present.
+
+ Example:
+ ```python
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool(): ...
+
+ @mcp.tool(auth=require_scopes("read", "write"))
+ def read_write_tool(): ...
+ ```
+ """
+ required = set(scopes)
+
+ def check(ctx: AuthContext) -> bool:
+ if ctx.token is None:
+ return False
+ return required.issubset(set(ctx.token.scopes))
+
+ return check
+
+
+def restrict_tag(tag: str, *, scopes: list[str]) -> AuthCheck:
+ """Restrict tools with a specific tag to require certain scopes.
+
+ If the tool has the specified tag, the token must have ALL the
+ required scopes. If the tool doesn't have the tag, access is allowed.
+
+ Args:
+ tag: The tag that triggers the scope requirement.
+ scopes: List of scopes required when the tag is present.
+
+ Example:
+ ```python
+ # Tools tagged "admin" require the "admin" scope
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+ ```
+ """
+ required = set(scopes)
+
+ def check(ctx: AuthContext) -> bool:
+ if tag not in ctx.tool.tags:
+ return True # Tag not present, no restriction
+ if ctx.token is None:
+ return False
+ return required.issubset(set(ctx.token.scopes))
+
+ return check
+
+
+def run_auth_checks(
+ checks: AuthCheck | list[AuthCheck],
+ ctx: AuthContext,
+) -> bool:
+ """Run auth checks with AND logic.
+
+ All checks must pass for authorization to succeed.
+
+ Auth checks can:
+ - Return True to allow access
+ - Return False to deny access
+ - Raise AuthorizationError to deny with a custom message (propagates)
+ - Raise other exceptions (masked for security, treated as denial)
+
+ Args:
+ checks: A single check function or list of check functions.
+ ctx: The auth context to pass to each check.
+
+ Returns:
+ True if all checks pass, False if any check fails.
+
+ Raises:
+ AuthorizationError: If an auth check explicitly raises it.
+ """
+ check_list = [checks] if not isinstance(checks, list) else checks
+ check_list = cast(list[AuthCheck], check_list)
+
+ for check in check_list:
+ try:
+ if not check(ctx):
+ return False
+ except AuthorizationError:
+ # Let AuthorizationError propagate with its custom message
+ raise
+ except Exception:
+ # Mask other exceptions for security - log and treat as auth failure
+ logger.warning(
+ f"Auth check {getattr(check, '__name__', repr(check))} "
+ "raised an unexpected exception",
+ exc_info=True,
+ )
+ return False
+
+ return True
diff --git a/src/fastmcp/server/middleware/__init__.py b/src/fastmcp/server/middleware/__init__.py
index 98afc1875f..8df6962bdb 100644
--- a/src/fastmcp/server/middleware/__init__.py
+++ b/src/fastmcp/server/middleware/__init__.py
@@ -1,3 +1,4 @@
+from .authorization import AuthMiddleware
from .middleware import (
CallNext,
Middleware,
@@ -6,6 +7,7 @@
from .ping import PingMiddleware
__all__ = [
+ "AuthMiddleware",
"CallNext",
"Middleware",
"MiddlewareContext",
diff --git a/src/fastmcp/server/middleware/authorization.py b/src/fastmcp/server/middleware/authorization.py
new file mode 100644
index 0000000000..72ea81457c
--- /dev/null
+++ b/src/fastmcp/server/middleware/authorization.py
@@ -0,0 +1,144 @@
+"""Authorization middleware for FastMCP.
+
+This module provides middleware-based authorization using callable auth checks.
+AuthMiddleware applies auth checks globally to all tools on the server.
+
+Example:
+ ```python
+ from fastmcp import FastMCP
+ from fastmcp.server.auth import require_auth, require_scopes, restrict_tag
+ from fastmcp.server.middleware import AuthMiddleware
+
+ # Require auth for all tools
+ mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=require_auth)
+ ])
+
+ # Tag-based: tools tagged "admin" require "admin" scope
+ mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+ ])
+ ```
+"""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import Sequence
+
+import mcp.types as mt
+
+from fastmcp.exceptions import AuthorizationError
+from fastmcp.server.auth.authorization import (
+ AuthCheck,
+ AuthContext,
+ run_auth_checks,
+)
+from fastmcp.server.dependencies import get_access_token
+from fastmcp.server.middleware.middleware import (
+ CallNext,
+ Middleware,
+ MiddlewareContext,
+)
+from fastmcp.tools.tool import Tool, ToolResult
+
+logger = logging.getLogger(__name__)
+
+
+class AuthMiddleware(Middleware):
+ """Global authorization middleware using callable checks.
+
+ This middleware applies auth checks to all tools on the server.
+ It uses the same callable API as tool-level auth checks.
+
+ The middleware:
+ - Filters tools from list_tools response based on auth checks
+ - Checks auth before tool execution in call_tool
+
+ Args:
+ auth: A single auth check function or list of check functions.
+ All checks must pass for authorization to succeed (AND logic).
+
+ Example:
+ ```python
+ from fastmcp import FastMCP
+ from fastmcp.server.auth import require_auth, require_scopes
+
+ # Require any authentication for all tools
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ # Require specific scope for all tools
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("api"))])
+
+ # Combined checks (AND logic)
+ mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=[require_auth, require_scopes("api")])
+ ])
+ ```
+ """
+
+ def __init__(self, auth: AuthCheck | list[AuthCheck]) -> None:
+ self.auth = auth
+
+ async def on_list_tools(
+ self,
+ context: MiddlewareContext[mt.ListToolsRequest],
+ call_next: CallNext[mt.ListToolsRequest, Sequence[Tool]],
+ ) -> Sequence[Tool]:
+ """Filter tools/list response based on auth checks."""
+ tools = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return tools
+
+ token = get_access_token()
+
+ authorized_tools: list[Tool] = []
+ for tool in tools:
+ ctx = AuthContext(token=token, tool=tool)
+ if run_auth_checks(self.auth, ctx):
+ authorized_tools.append(tool)
+
+ return authorized_tools
+
+ async def on_call_tool(
+ self,
+ context: MiddlewareContext[mt.CallToolRequestParams],
+ call_next: CallNext[mt.CallToolRequestParams, ToolResult],
+ ) -> ToolResult:
+ """Check auth before tool execution."""
+ # STDIO has no auth concept, skip enforcement
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return await call_next(context)
+
+ # Get the tool being called
+ tool_name = context.message.name
+ fastmcp = context.fastmcp_context
+ if fastmcp is None:
+ # Fail closed: deny access when context is missing
+ logger.warning(
+ f"AuthMiddleware: fastmcp_context is None for tool '{tool_name}'. "
+ "Denying access for security."
+ )
+ raise AuthorizationError(
+ f"Authorization failed for tool '{tool_name}': missing context"
+ )
+
+ tool = await fastmcp.fastmcp.get_tool(tool_name)
+
+ token = get_access_token()
+ ctx = AuthContext(token=token, tool=tool)
+
+ if not run_auth_checks(self.auth, ctx):
+ raise AuthorizationError(
+ f"Authorization failed for tool '{tool_name}': insufficient permissions"
+ )
+
+ return await call_next(context)
diff --git a/src/fastmcp/server/providers/local_provider.py b/src/fastmcp/server/providers/local_provider.py
index 645d17543c..ad74530edb 100644
--- a/src/fastmcp/server/providers/local_provider.py
+++ b/src/fastmcp/server/providers/local_provider.py
@@ -41,7 +41,7 @@ def greet(name: str) -> str:
from fastmcp.resources.template import ResourceTemplate
from fastmcp.server.providers.base import Provider
from fastmcp.server.tasks.config import TaskConfig
-from fastmcp.tools.tool import FunctionTool, Tool
+from fastmcp.tools.tool import AuthCheckCallable, FunctionTool, Tool
from fastmcp.tools.tool_transform import (
ToolTransformConfig,
apply_transformations_to_tools,
@@ -363,6 +363,7 @@ def tool(
enabled: bool = True,
task: bool | TaskConfig | None = None,
serializer: ToolResultSerializerType | None = None, # Deprecated
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool: ...
@overload
@@ -382,6 +383,7 @@ def tool(
enabled: bool = True,
task: bool | TaskConfig | None = None,
serializer: ToolResultSerializerType | None = None, # Deprecated
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
# NOTE: This method mirrors fastmcp.tools.tool() but adds registration,
@@ -404,6 +406,7 @@ def tool(
enabled: bool = True,
task: bool | TaskConfig | None = None,
serializer: ToolResultSerializerType | None = None, # Deprecated
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionTool]
| FunctionTool
@@ -496,6 +499,7 @@ def my_tool(x: int) -> str:
meta=meta,
serializer=serializer,
task=supports_task,
+ auth=auth,
)
self.add_tool(tool_obj)
# If disabled, add to blocklist
@@ -534,6 +538,7 @@ def my_tool(x: int) -> str:
enabled=enabled,
task=task,
serializer=serializer,
+ auth=auth,
)
def resource(
diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py
index d1b48ae9a9..f85eec7465 100644
--- a/src/fastmcp/server/server.py
+++ b/src/fastmcp/server/server.py
@@ -68,7 +68,8 @@
from fastmcp.prompts.prompt import FunctionPrompt, PromptResult
from fastmcp.resources.resource import Resource, ResourceResult
from fastmcp.resources.template import ResourceTemplate
-from fastmcp.server.auth import AuthProvider
+from fastmcp.server.auth import AuthContext, AuthProvider, run_auth_checks
+from fastmcp.server.dependencies import get_access_token
from fastmcp.server.event_store import EventStore
from fastmcp.server.http import (
StarletteWithLifespan,
@@ -83,7 +84,7 @@
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
from fastmcp.settings import DuplicateBehavior as DuplicateBehaviorSetting
from fastmcp.settings import Settings
-from fastmcp.tools.tool import FunctionTool, Tool, ToolResult
+from fastmcp.tools.tool import AuthCheckCallable, FunctionTool, Tool, ToolResult
from fastmcp.tools.tool_transform import ToolTransformConfig
from fastmcp.utilities.async_utils import gather
from fastmcp.utilities.cli import log_server_banner
@@ -859,6 +860,14 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
return_exceptions=True,
)
+ # Get token once for auth filtering (only applies to HTTP transports)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
all_tools: dict[str, Tool] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -868,21 +877,34 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
raise result
continue
for tool in result:
- if self._is_component_enabled(tool) and tool.key not in all_tools:
- all_tools[tool.key] = tool
+ if not self._is_component_enabled(tool) or tool.key in all_tools:
+ continue
+ # Check tool-level auth (skip for STDIO)
+ if not is_stdio and tool.auth is not None:
+ ctx = AuthContext(token=token, tool=tool)
+ if not run_auth_checks(tool.auth, ctx):
+ continue
+ all_tools[tool.key] = tool
return list(all_tools.values())
async def get_tool(self, name: str) -> Tool:
"""Get an enabled tool by name.
Queries all providers in parallel to find the tool.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_tool(name) for p in self._providers],
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -891,6 +913,11 @@ async def get_tool(self, name: str) -> Tool:
)
continue
if isinstance(result, Tool) and self._is_component_enabled(result):
+ # Check tool-level auth (skip for STDIO)
+ if not is_stdio and result.auth is not None:
+ ctx = AuthContext(token=token, tool=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown tool: {name!r}")
@@ -1756,6 +1783,7 @@ def tool(
exclude_args: list[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool: ...
@overload
@@ -1773,6 +1801,7 @@ def tool(
exclude_args: list[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
def tool(
@@ -1789,6 +1818,7 @@ def tool(
exclude_args: list[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionTool]
| FunctionTool
@@ -1856,6 +1886,7 @@ def my_tool(x: int) -> str:
meta=meta,
task=task if task is not None else self._support_tasks_by_default,
serializer=self._tool_serializer,
+ auth=auth,
)
return result
diff --git a/src/fastmcp/tools/tool.py b/src/fastmcp/tools/tool.py
index 576efefe28..1c6db5e9a3 100644
--- a/src/fastmcp/tools/tool.py
+++ b/src/fastmcp/tools/tool.py
@@ -51,6 +51,10 @@
replace_type,
)
+# Runtime type alias for auth checks to avoid circular imports with authorization.py
+# AuthCheck is Callable[[AuthContext], bool] but we use Any to avoid the import
+AuthCheckCallable: TypeAlias = Callable[[Any], bool]
+
if TYPE_CHECKING:
from docket import Docket
from docket.execution import Execution
@@ -166,6 +170,10 @@ class Tool(FastMCPComponent):
description="Deprecated. Return ToolResult from your tools for full control over serialization."
),
] = None
+ auth: Annotated[
+ AuthCheckCallable | list[AuthCheckCallable] | None,
+ Field(description="Authorization checks for this tool"),
+ ] = None
@model_validator(mode="after")
def _validate_tool_name(self) -> Tool:
@@ -215,6 +223,7 @@ def from_function(
serializer: ToolResultSerializerType | None = None, # Deprecated
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool:
"""Create a Tool from a function."""
return FunctionTool.from_function(
@@ -230,6 +239,7 @@ def from_function(
serializer=serializer,
meta=meta,
task=task,
+ auth=auth,
)
async def run(self, arguments: dict[str, Any]) -> ToolResult:
@@ -438,6 +448,7 @@ def from_function(
serializer: ToolResultSerializerType | None = None, # Deprecated
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionTool:
"""Create a Tool from a function."""
if serializer is not None and fastmcp.settings.deprecation_warnings:
@@ -500,6 +511,7 @@ def from_function(
serializer=serializer,
meta=meta,
task_config=task_config,
+ auth=auth,
)
async def run(self, arguments: dict[str, Any]) -> ToolResult:
@@ -793,6 +805,7 @@ def tool(
annotations: ToolAnnotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
@@ -809,6 +822,7 @@ def tool(
annotations: ToolAnnotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionTool]: ...
@@ -824,6 +838,7 @@ def tool(
annotations: ToolAnnotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionTool]
| FunctionTool
@@ -917,6 +932,7 @@ def search(query: str) -> list[dict]:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
elif isinstance(name_or_fn, str):
@@ -947,4 +963,5 @@ def search(query: str) -> list[dict]:
annotations=annotations,
meta=meta,
task=task,
+ auth=auth,
)
diff --git a/src/fastmcp/tools/tool_transform.py b/src/fastmcp/tools/tool_transform.py
index b05f867689..b0700eea84 100644
--- a/src/fastmcp/tools/tool_transform.py
+++ b/src/fastmcp/tools/tool_transform.py
@@ -594,6 +594,7 @@ async def custom_output(**kwargs) -> ToolResult:
serializer=final_serializer,
meta=final_meta,
transform_args=transform_args,
+ auth=tool.auth,
)
return transformed_tool
diff --git a/tests/server/auth/test_authorization.py b/tests/server/auth/test_authorization.py
new file mode 100644
index 0000000000..051bef00ab
--- /dev/null
+++ b/tests/server/auth/test_authorization.py
@@ -0,0 +1,598 @@
+"""Tests for authorization checks and AuthMiddleware."""
+
+from unittest.mock import Mock
+
+import pytest
+from mcp.server.auth.middleware.auth_context import auth_context_var
+from mcp.server.auth.middleware.bearer_auth import AuthenticatedUser
+
+from fastmcp import FastMCP
+from fastmcp.client import Client
+from fastmcp.exceptions import NotFoundError
+from fastmcp.server.auth import (
+ AccessToken,
+ AuthContext,
+ require_auth,
+ require_scopes,
+ restrict_tag,
+ run_auth_checks,
+)
+from fastmcp.server.middleware import AuthMiddleware
+
+
+# =============================================================================
+# Test helpers
+# =============================================================================
+
+
+def make_token(scopes: list[str] | None = None) -> AccessToken:
+ """Create a test access token."""
+ return AccessToken(
+ token="test-token",
+ client_id="test-client",
+ scopes=scopes or [],
+ expires_at=None,
+ claims={},
+ )
+
+
+def make_tool() -> Mock:
+ """Create a mock tool for testing."""
+ tool = Mock()
+ tool.tags = set()
+ return tool
+
+
+# =============================================================================
+# Tests for require_auth
+# =============================================================================
+
+
+class TestRequireAuth:
+ def test_returns_true_with_token(self):
+ ctx = AuthContext(token=make_token(), tool=make_tool())
+ assert require_auth(ctx) is True
+
+ def test_returns_false_without_token(self):
+ ctx = AuthContext(token=None, tool=make_tool())
+ assert require_auth(ctx) is False
+
+
+# =============================================================================
+# Tests for require_scopes
+# =============================================================================
+
+
+class TestRequireScopes:
+ def test_returns_true_with_matching_scope(self):
+ token = make_token(scopes=["admin"])
+ ctx = AuthContext(token=token, tool=make_tool())
+ check = require_scopes("admin")
+ assert check(ctx) is True
+
+ def test_returns_true_with_all_required_scopes(self):
+ token = make_token(scopes=["read", "write", "admin"])
+ ctx = AuthContext(token=token, tool=make_tool())
+ check = require_scopes("read", "write")
+ assert check(ctx) is True
+
+ def test_returns_false_with_missing_scope(self):
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, tool=make_tool())
+ check = require_scopes("admin")
+ assert check(ctx) is False
+
+ def test_returns_false_with_partial_scopes(self):
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, tool=make_tool())
+ check = require_scopes("read", "write")
+ assert check(ctx) is False
+
+ def test_returns_false_without_token(self):
+ ctx = AuthContext(token=None, tool=make_tool())
+ check = require_scopes("admin")
+ assert check(ctx) is False
+
+
+# =============================================================================
+# Tests for restrict_tag
+# =============================================================================
+
+
+class TestRestrictTag:
+ def test_allows_access_when_tag_not_present(self):
+ tool = make_tool()
+ tool.tags = {"other"}
+ ctx = AuthContext(token=None, tool=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is True
+
+ def test_blocks_access_when_tag_present_without_token(self):
+ tool = make_tool()
+ tool.tags = {"admin"}
+ ctx = AuthContext(token=None, tool=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is False
+
+ def test_blocks_access_when_tag_present_without_scope(self):
+ tool = make_tool()
+ tool.tags = {"admin"}
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, tool=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is False
+
+ def test_allows_access_when_tag_present_with_scope(self):
+ tool = make_tool()
+ tool.tags = {"admin"}
+ token = make_token(scopes=["admin"])
+ ctx = AuthContext(token=token, tool=tool)
+ check = restrict_tag("admin", scopes=["admin"])
+ assert check(ctx) is True
+
+
+# =============================================================================
+# Tests for run_auth_checks
+# =============================================================================
+
+
+class TestRunAuthChecks:
+ def test_single_check_passes(self):
+ ctx = AuthContext(token=make_token(), tool=make_tool())
+ assert run_auth_checks(require_auth, ctx) is True
+
+ def test_single_check_fails(self):
+ ctx = AuthContext(token=None, tool=make_tool())
+ assert run_auth_checks(require_auth, ctx) is False
+
+ def test_multiple_checks_all_pass(self):
+ token = make_token(scopes=["admin"])
+ ctx = AuthContext(token=token, tool=make_tool())
+ checks = [require_auth, require_scopes("admin")]
+ assert run_auth_checks(checks, ctx) is True
+
+ def test_multiple_checks_one_fails(self):
+ token = make_token(scopes=["read"])
+ ctx = AuthContext(token=token, tool=make_tool())
+ checks = [require_auth, require_scopes("admin")]
+ assert run_auth_checks(checks, ctx) is False
+
+ def test_empty_list_passes(self):
+ ctx = AuthContext(token=None, tool=make_tool())
+ assert run_auth_checks([], ctx) is True
+
+ def test_custom_lambda_check(self):
+ token = make_token()
+ token.claims = {"level": 5}
+ ctx = AuthContext(token=token, tool=make_tool())
+ check = lambda ctx: ctx.token is not None and ctx.token.claims.get("level", 0) >= 3
+ assert run_auth_checks(check, ctx) is True
+
+ def test_authorization_error_propagates(self):
+ """AuthorizationError from auth check should propagate with custom message."""
+ from fastmcp.exceptions import AuthorizationError
+
+ def custom_auth_check(ctx: AuthContext) -> bool:
+ raise AuthorizationError("Custom denial reason")
+
+ ctx = AuthContext(token=make_token(), tool=make_tool())
+ with pytest.raises(AuthorizationError, match="Custom denial reason"):
+ run_auth_checks(custom_auth_check, ctx)
+
+ def test_generic_exception_is_masked(self):
+ """Generic exceptions from auth checks should be masked (return False)."""
+
+ def buggy_auth_check(ctx: AuthContext) -> bool:
+ raise ValueError("Unexpected internal error")
+
+ ctx = AuthContext(token=make_token(), tool=make_tool())
+ # Should return False, not raise the ValueError
+ assert run_auth_checks(buggy_auth_check, ctx) is False
+
+ def test_authorization_error_stops_chain(self):
+ """AuthorizationError should stop the check chain and propagate."""
+ from fastmcp.exceptions import AuthorizationError
+
+ call_order = []
+
+ def check_1(ctx: AuthContext) -> bool:
+ call_order.append(1)
+ return True
+
+ def check_2(ctx: AuthContext) -> bool:
+ call_order.append(2)
+ raise AuthorizationError("Explicit denial")
+
+ def check_3(ctx: AuthContext) -> bool:
+ call_order.append(3)
+ return True
+
+ ctx = AuthContext(token=make_token(), tool=make_tool())
+ with pytest.raises(AuthorizationError, match="Explicit denial"):
+ run_auth_checks([check_1, check_2, check_3], ctx)
+
+ # Check 3 should not be called
+ assert call_order == [1, 2]
+
+
+# =============================================================================
+# Tests for tool-level auth with FastMCP
+# =============================================================================
+
+
+def set_token(token: AccessToken | None):
+ """Set the access token in the auth context var."""
+ if token is None:
+ return auth_context_var.set(None)
+ return auth_context_var.set(AuthenticatedUser(token))
+
+
+class TestToolLevelAuth:
+ async def test_tool_without_auth_is_visible(self):
+ mcp = FastMCP()
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "public_tool"
+
+ async def test_tool_with_auth_hidden_without_token(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ # No token set - tool should be hidden
+ tools = await mcp.get_tools()
+ assert len(tools) == 0
+
+ async def test_tool_with_auth_visible_with_token(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ # Set token in context
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "protected_tool"
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_tool_with_scope_auth_hidden_without_scope(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool() -> str:
+ return "admin"
+
+ # Token without admin scope
+ token = make_token(scopes=["read"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 0
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_tool_with_scope_auth_visible_with_scope(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_scopes("admin"))
+ def admin_tool() -> str:
+ return "admin"
+
+ # Token with admin scope
+ token = make_token(scopes=["admin"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "admin_tool"
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_get_tool_returns_not_found_without_auth(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ with pytest.raises(NotFoundError):
+ await mcp.get_tool("protected_tool")
+
+ async def test_get_tool_returns_tool_with_auth(self):
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tool = await mcp.get_tool("protected_tool")
+ assert tool.name == "protected_tool"
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Tests for AuthMiddleware
+# =============================================================================
+
+
+class TestAuthMiddleware:
+ async def test_middleware_filters_tools_without_token(self):
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ # No token - all tools filtered by middleware
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 0
+
+ async def test_middleware_allows_tools_with_token(self):
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 1
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_middleware_with_scope_check(self):
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("api"))])
+
+ @mcp.tool
+ def api_tool() -> str:
+ return "api"
+
+ # Token without api scope
+ token = make_token(scopes=["read"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 0
+ finally:
+ auth_context_var.reset(tok)
+
+ # Token with api scope
+ token = make_token(scopes=["api"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 1
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_middleware_with_restrict_tag(self):
+ mcp = FastMCP(
+ middleware=[AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))]
+ )
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ @mcp.tool(tags={"admin"})
+ def admin_tool() -> str:
+ return "admin"
+
+ # No token - public tool allowed, admin tool blocked
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 1
+ assert tools[0].name == "public_tool"
+
+ # Token with admin scope - both allowed
+ token = make_token(scopes=["admin"])
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools(run_middleware=True)
+ assert len(tools) == 2
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Integration tests with Client
+# =============================================================================
+
+
+class TestAuthIntegration:
+ async def test_client_only_sees_authorized_tools(self):
+ mcp = FastMCP()
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ async with Client(mcp) as client:
+ # No token - only public tool visible
+ tools = await client.list_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "public_tool"
+
+ async def test_client_with_token_sees_all_authorized_tools(self):
+ mcp = FastMCP()
+
+ @mcp.tool
+ def public_tool() -> str:
+ return "public"
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool() -> str:
+ return "protected"
+
+ # Set token before creating client
+ token = make_token()
+ tok = set_token(token)
+ try:
+ async with Client(mcp) as client:
+ tools = await client.list_tools()
+ tool_names = [t.name for t in tools]
+ # With token, both tools should be visible
+ assert "public_tool" in tool_names
+ assert "protected_tool" in tool_names
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Tests for transformed tools preserving auth
+# =============================================================================
+
+
+class TestTransformedToolAuth:
+ async def test_transformed_tool_preserves_auth(self):
+ """Transformed tools should inherit auth from parent."""
+ from fastmcp.tools.tool_transform import TransformedTool
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(x: int) -> str:
+ return str(x)
+
+ # Get the tool and transform it
+ tools = await mcp._local_provider.list_tools()
+ original_tool = tools[0]
+ assert original_tool.auth is not None
+
+ # Transform the tool
+ transformed = TransformedTool.from_tool(
+ original_tool,
+ name="transformed_protected",
+ )
+
+ # Auth should be preserved
+ assert transformed.auth is not None
+ assert transformed.auth == original_tool.auth
+
+ async def test_transformed_tool_filtered_without_token(self):
+ """Transformed tools with auth should be filtered without token."""
+ from fastmcp.tools.tool_transform import ToolTransformConfig
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(x: int) -> str:
+ return str(x)
+
+ # Add transformation
+ mcp.add_tool_transformation(
+ "protected_tool", ToolTransformConfig(name="renamed_protected")
+ )
+
+ # Without token, transformed tool should not be visible
+ tools = await mcp.get_tools()
+ assert len(tools) == 0
+
+ async def test_transformed_tool_visible_with_token(self):
+ """Transformed tools with auth should be visible with token."""
+ from fastmcp.tools.tool_transform import ToolTransformConfig
+
+ mcp = FastMCP()
+
+ @mcp.tool(auth=require_auth)
+ def protected_tool(x: int) -> str:
+ return str(x)
+
+ # Add transformation
+ mcp.add_tool_transformation(
+ "protected_tool", ToolTransformConfig(name="renamed_protected")
+ )
+
+ # With token, transformed tool should be visible
+ token = make_token()
+ tok = set_token(token)
+ try:
+ tools = await mcp.get_tools()
+ assert len(tools) == 1
+ assert tools[0].name == "renamed_protected"
+ finally:
+ auth_context_var.reset(tok)
+
+
+# =============================================================================
+# Tests for AuthMiddleware on_call_tool enforcement
+# =============================================================================
+
+
+class TestAuthMiddlewareCallTool:
+ async def test_middleware_blocks_call_without_auth(self):
+ """AuthMiddleware should raise AuthorizationError on unauthorized call."""
+ from fastmcp.exceptions import AuthorizationError
+
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def my_tool() -> str:
+ return "result"
+
+ # Without token, calling the tool should raise AuthorizationError
+ async with Client(mcp) as client:
+ with pytest.raises(Exception) as exc_info:
+ await client.call_tool("my_tool", {})
+ # The error message should indicate authorization failure
+ assert "authorization" in str(exc_info.value).lower() or "insufficient" in str(exc_info.value).lower()
+
+ async def test_middleware_allows_call_with_auth(self):
+ """AuthMiddleware should allow tool call with valid token."""
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+ @mcp.tool
+ def my_tool() -> str:
+ return "result"
+
+ # With token, calling the tool should succeed
+ token = make_token()
+ tok = set_token(token)
+ try:
+ async with Client(mcp) as client:
+ result = await client.call_tool("my_tool", {})
+ assert result.content[0].text == "result"
+ finally:
+ auth_context_var.reset(tok)
+
+ async def test_middleware_blocks_call_with_wrong_scope(self):
+ """AuthMiddleware should block calls when scope requirements aren't met."""
+ from fastmcp.exceptions import AuthorizationError
+
+ mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("admin"))])
+
+ @mcp.tool
+ def admin_tool() -> str:
+ return "admin result"
+
+ # With token that lacks admin scope
+ token = make_token(scopes=["read"])
+ tok = set_token(token)
+ try:
+ async with Client(mcp) as client:
+ with pytest.raises(Exception) as exc_info:
+ await client.call_tool("admin_tool", {})
+ assert "authorization" in str(exc_info.value).lower() or "insufficient" in str(exc_info.value).lower()
+ finally:
+ auth_context_var.reset(tok)
diff --git a/uv.lock b/uv.lock
index 600f0392f4..30cbf4e213 100644
--- a/uv.lock
+++ b/uv.lock
@@ -723,6 +723,7 @@ dev = [
{ name = "inline-snapshot", extra = ["dirty-equals"] },
{ name = "ipython", version = "8.37.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
{ name = "ipython", version = "9.8.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" },
+ { name = "loq" },
{ name = "pdbpp" },
{ name = "prek" },
{ name = "psutil" },
@@ -774,6 +775,7 @@ dev = [
{ name = "fastmcp", extras = ["anthropic", "openai", "tasks"] },
{ name = "inline-snapshot", extras = ["dirty-equals"], specifier = ">=0.27.2" },
{ name = "ipython", specifier = ">=8.12.3" },
+ { name = "loq", specifier = ">=0.1.0a3" },
{ name = "pdbpp", specifier = ">=0.11.7" },
{ name = "prek", specifier = ">=0.2.12" },
{ name = "psutil", specifier = ">=7.0.0" },
@@ -1175,6 +1177,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" },
]
+[[package]]
+name = "loq"
+version = "0.1.0a3"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/87/2d/911c0ca3175aa30358eb0b1dd4789e3695292db63dc50da34be0d8aa001a/loq-0.1.0a3.tar.gz", hash = "sha256:8f34ed32eed79d3257fc6c6e2ffc4970795bae0fbd7dee95cdb027d351a87839", size = 49278, upload-time = "2026-01-12T05:13:34.14Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/6a/7c/d028a1041caef85844b7a68926138daedac6ab4aa720f6cb8c77a30dcb49/loq-0.1.0a3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e65cb220446f1a2fad5c0e3e0c97715204652cb3896b35bb50967745d1f74ade", size = 1285120, upload-time = "2026-01-12T05:13:29.418Z" },
+ { url = "https://files.pythonhosted.org/packages/91/49/cfa207080940c9d447ca4633d7a25790d50e936de29d154dcc1941c45e54/loq-0.1.0a3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:20afc0638df897312ea33f064cd8d05ee437c77ff6fca890ac33d41cc5fe9dbd", size = 1212512, upload-time = "2026-01-12T05:13:26.353Z" },
+ { url = "https://files.pythonhosted.org/packages/80/d9/68f2418009771b764fc6fcbd379892464c7d96cf668ea570f8b3d6e93806/loq-0.1.0a3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:efb606799326bdba9e0931ae6664076eb24ea9ce29f540ec62071f3da7d89753", size = 1232624, upload-time = "2026-01-12T05:13:30.431Z" },
+ { url = "https://files.pythonhosted.org/packages/cc/27/10e195de9e4e843fed4a098f4a7b3e2b78a0218d317adb3ea83b341fa8e9/loq-0.1.0a3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:63a146821abc50f9166fdec83a92330ddec81d78fc02d0956cf00957fc02f4e6", size = 1324994, upload-time = "2026-01-12T05:13:27.895Z" },
+ { url = "https://files.pythonhosted.org/packages/98/09/76677afb5f9d380a89cf59489f03cdb7c1c6318d1c81c6fc3073cce93893/loq-0.1.0a3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:afc46f65303b9ff64ae0e517f4834cf0e5b97cd5b375cbfd66dadf267f94f725", size = 1272635, upload-time = "2026-01-12T05:13:23.759Z" },
+ { url = "https://files.pythonhosted.org/packages/65/77/fda09acd73fc44f6a7cfa9851dd8d8fdab0829311b9c7fa1f1af30b3affa/loq-0.1.0a3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:3e5501abcbab67d14e6c71a6c010609430a9495a888e10047d5542ffd46acd65", size = 1389293, upload-time = "2026-01-12T05:13:32.655Z" },
+ { url = "https://files.pythonhosted.org/packages/39/68/2dbb5e29e0333f036b92b09f6b54bbd8e68b51b780051f01e15c307b6ab1/loq-0.1.0a3-py3-none-win_amd64.whl", hash = "sha256:51cd849b94f09d211bb842809cce4172ee0904092be2643bdb9629001745f3b9", size = 1283650, upload-time = "2026-01-12T05:13:25.042Z" },
+]
+
[[package]]
name = "lupa"
version = "2.6"
From 030e7190c2b413df5967214c240f685a1d7115c6 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 17:54:44 -0500
Subject: [PATCH 2/8] Extend authorization to resources and prompts
- AuthContext now uses `component` field instead of `tool`
(tool is a backwards-compatible property)
- Add `auth` parameter to all resource/prompt decorators
- Add auth filtering to get_resources, get_prompts, etc.
- AuthMiddleware now handles all component types
- Catch AuthorizationError in list operations (hide instead of fail)
---
src/fastmcp/prompts/prompt.py | 14 ++
src/fastmcp/resources/resource.py | 13 ++
src/fastmcp/resources/template.py | 8 +
src/fastmcp/server/auth/authorization.py | 45 +++--
.../server/middleware/authorization.py | 173 ++++++++++++++++--
.../server/providers/local_provider.py | 8 +
src/fastmcp/server/server.py | 134 ++++++++++++--
tests/server/auth/test_authorization.py | 58 +++---
8 files changed, 382 insertions(+), 71 deletions(-)
diff --git a/src/fastmcp/prompts/prompt.py b/src/fastmcp/prompts/prompt.py
index 7b2f9f4560..7fc691dd11 100644
--- a/src/fastmcp/prompts/prompt.py
+++ b/src/fastmcp/prompts/prompt.py
@@ -32,6 +32,7 @@
without_injected_parameters,
)
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
+from fastmcp.tools.tool import AuthCheckCallable
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.json_schema import compress_schema
from fastmcp.utilities.logging import get_logger
@@ -200,6 +201,9 @@ class Prompt(FastMCPComponent):
arguments: list[PromptArgument] | None = Field(
default=None, description="Arguments that can be passed to the prompt"
)
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = Field(
+ default=None, description="Authorization checks for this prompt"
+ )
def to_mcp_prompt(
self,
@@ -238,6 +242,7 @@ def from_function(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt:
"""Create a Prompt from a function.
@@ -255,6 +260,7 @@ def from_function(
tags=tags,
meta=meta,
task=task,
+ auth=auth,
)
async def render(
@@ -406,6 +412,7 @@ def from_function(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt:
"""Create a Prompt from a function.
@@ -504,6 +511,7 @@ def from_function(
fn=wrapped_fn,
meta=meta,
task_config=task_config,
+ auth=auth,
)
def _convert_string_arguments(self, kwargs: dict[str, Any]) -> dict[str, Any]:
@@ -635,6 +643,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
@@ -649,6 +658,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
@@ -662,6 +672,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionPrompt]
| FunctionPrompt
@@ -693,6 +704,7 @@ def prompt(
tags: Optional set of tags for categorizing the prompt
meta: Optional meta information about the prompt
task: Optional task configuration for background execution (default False)
+ auth: Optional authorization checks for the prompt
Returns:
A FunctionPrompt when decorating a function, or a decorator function when
@@ -748,6 +760,7 @@ def my_prompt(data: str) -> str:
tags=tags,
meta=meta,
task=supports_task,
+ auth=auth,
)
elif isinstance(name_or_fn, str):
@@ -776,4 +789,5 @@ def my_prompt(data: str) -> str:
tags=tags,
meta=meta,
task=task,
+ auth=auth,
)
diff --git a/src/fastmcp/resources/resource.py b/src/fastmcp/resources/resource.py
index 8d2c023c33..ca81676908 100644
--- a/src/fastmcp/resources/resource.py
+++ b/src/fastmcp/resources/resource.py
@@ -33,6 +33,7 @@
without_injected_parameters,
)
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
+from fastmcp.tools.tool import AuthCheckCallable
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.types import get_fn_name
@@ -229,6 +230,10 @@ class Resource(FastMCPComponent):
Annotations | None,
Field(description="Optional annotations about the resource's behavior"),
] = None
+ auth: Annotated[
+ AuthCheckCallable | list[AuthCheckCallable] | None,
+ Field(description="Authorization checks for this resource"),
+ ] = None
@staticmethod
def from_function(
@@ -243,6 +248,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResource:
return FunctionResource.from_function(
fn=fn,
@@ -256,6 +262,7 @@ def from_function(
annotations=annotations,
meta=meta,
task=task,
+ auth=auth,
)
@field_validator("mime_type", mode="before")
@@ -431,6 +438,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResource:
"""Create a FunctionResource from a function."""
if isinstance(uri, str):
@@ -465,6 +473,7 @@ def from_function(
annotations=annotations,
meta=meta,
task_config=task_config,
+ auth=auth,
)
async def read(
@@ -510,6 +519,7 @@ def resource(
annotations: Annotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], Resource | ResourceTemplate]:
"""Standalone decorator to create a resource without registering it to a server.
@@ -536,6 +546,7 @@ def resource(
annotations: Optional annotations about the resource's behavior
meta: Optional meta information about the resource
task: Optional task configuration for background execution (default False)
+ auth: Optional authorization checks for the resource
Returns:
A decorator function that returns a Resource or ResourceTemplate.
@@ -608,6 +619,7 @@ def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
else:
return Resource.from_function(
@@ -622,6 +634,7 @@ def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
return decorator
diff --git a/src/fastmcp/resources/template.py b/src/fastmcp/resources/template.py
index 4f7c329288..be1597742a 100644
--- a/src/fastmcp/resources/template.py
+++ b/src/fastmcp/resources/template.py
@@ -27,6 +27,7 @@
without_injected_parameters,
)
from fastmcp.server.tasks.config import TaskConfig, TaskMeta
+from fastmcp.tools.tool import AuthCheckCallable
from fastmcp.utilities.components import FastMCPComponent
from fastmcp.utilities.json_schema import compress_schema
from fastmcp.utilities.types import get_cached_typeadapter
@@ -114,6 +115,9 @@ class ResourceTemplate(FastMCPComponent):
annotations: Annotations | None = Field(
default=None, description="Optional annotations about the resource's behavior"
)
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = Field(
+ default=None, description="Authorization checks for this resource template"
+ )
def __repr__(self) -> str:
return f"{self.__class__.__name__}(uri_template={self.uri_template!r}, name={self.name!r}, description={self.description!r}, tags={self.tags})"
@@ -131,6 +135,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResourceTemplate:
return FunctionResourceTemplate.from_function(
fn=fn,
@@ -144,6 +149,7 @@ def from_function(
annotations=annotations,
meta=meta,
task=task,
+ auth=auth,
)
@field_validator("mime_type", mode="before")
@@ -452,6 +458,7 @@ def from_function(
annotations: Annotations | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionResourceTemplate:
"""Create a template from a function."""
@@ -562,4 +569,5 @@ def from_function(
annotations=annotations,
meta=meta,
task_config=task_config,
+ auth=auth,
)
diff --git a/src/fastmcp/server/auth/authorization.py b/src/fastmcp/server/auth/authorization.py
index 1ae599260c..ae9e64a5b1 100644
--- a/src/fastmcp/server/auth/authorization.py
+++ b/src/fastmcp/server/auth/authorization.py
@@ -1,7 +1,7 @@
-"""Authorization checks for FastMCP tools.
+"""Authorization checks for FastMCP components.
-This module provides callable-based authorization for tools. Auth checks
-are functions that receive an AuthContext and return True to allow access
+This module provides callable-based authorization for tools, resources, and prompts.
+Auth checks are functions that receive an AuthContext and return True to allow access
or False to deny.
Auth checks can also raise exceptions:
@@ -18,19 +18,20 @@
@mcp.tool(auth=require_auth)
def protected_tool(): ...
- @mcp.tool(auth=require_scopes("admin"))
- def admin_tool(): ...
+ @mcp.resource("data://secret", auth=require_scopes("read"))
+ def secret_data(): ...
- @mcp.tool(auth=[require_auth, require_scopes("write")])
- def write_tool(): ...
+ @mcp.prompt(auth=require_auth)
+ def admin_prompt(): ...
```
"""
from __future__ import annotations
import logging
+from collections.abc import Callable
from dataclasses import dataclass
-from typing import TYPE_CHECKING, Callable, cast
+from typing import TYPE_CHECKING, cast
from fastmcp.exceptions import AuthorizationError
@@ -39,6 +40,7 @@ def write_tool(): ...
if TYPE_CHECKING:
from fastmcp.server.auth import AccessToken
from fastmcp.tools.tool import Tool
+ from fastmcp.utilities.components import FastMCPComponent
@dataclass
@@ -46,15 +48,26 @@ class AuthContext:
"""Context passed to auth check callables.
This object is passed to each auth check function and provides
- access to the current authentication token and the tool being accessed.
+ access to the current authentication token and the component being accessed.
Attributes:
token: The current access token, or None if unauthenticated.
- tool: The tool being accessed.
+ component: The component (tool, resource, or prompt) being accessed.
+ tool: Backwards-compatible alias for component when it's a Tool.
"""
token: AccessToken | None
- tool: Tool
+ component: FastMCPComponent
+
+ @property
+ def tool(self) -> Tool | None:
+ """Backwards-compatible access to the component as a Tool.
+
+ Returns the component if it's a Tool, None otherwise.
+ """
+ from fastmcp.tools.tool import Tool
+
+ return self.component if isinstance(self.component, Tool) else None
# Type alias for auth check functions
@@ -104,10 +117,10 @@ def check(ctx: AuthContext) -> bool:
def restrict_tag(tag: str, *, scopes: list[str]) -> AuthCheck:
- """Restrict tools with a specific tag to require certain scopes.
+ """Restrict components with a specific tag to require certain scopes.
- If the tool has the specified tag, the token must have ALL the
- required scopes. If the tool doesn't have the tag, access is allowed.
+ If the component has the specified tag, the token must have ALL the
+ required scopes. If the component doesn't have the tag, access is allowed.
Args:
tag: The tag that triggers the scope requirement.
@@ -115,14 +128,14 @@ def restrict_tag(tag: str, *, scopes: list[str]) -> AuthCheck:
Example:
```python
- # Tools tagged "admin" require the "admin" scope
+ # Components tagged "admin" require the "admin" scope
AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
```
"""
required = set(scopes)
def check(ctx: AuthContext) -> bool:
- if tag not in ctx.tool.tags:
+ if tag not in ctx.component.tags:
return True # Tag not present, no restriction
if ctx.token is None:
return False
diff --git a/src/fastmcp/server/middleware/authorization.py b/src/fastmcp/server/middleware/authorization.py
index 72ea81457c..5e2fa09d12 100644
--- a/src/fastmcp/server/middleware/authorization.py
+++ b/src/fastmcp/server/middleware/authorization.py
@@ -1,7 +1,7 @@
"""Authorization middleware for FastMCP.
This module provides middleware-based authorization using callable auth checks.
-AuthMiddleware applies auth checks globally to all tools on the server.
+AuthMiddleware applies auth checks globally to all components on the server.
Example:
```python
@@ -9,12 +9,12 @@
from fastmcp.server.auth import require_auth, require_scopes, restrict_tag
from fastmcp.server.middleware import AuthMiddleware
- # Require auth for all tools
+ # Require auth for all components
mcp = FastMCP(middleware=[
AuthMiddleware(auth=require_auth)
])
- # Tag-based: tools tagged "admin" require "admin" scope
+ # Tag-based: components tagged "admin" require "admin" scope
mcp = FastMCP(middleware=[
AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
])
@@ -29,6 +29,9 @@
import mcp.types as mt
from fastmcp.exceptions import AuthorizationError
+from fastmcp.prompts.prompt import Prompt, PromptResult
+from fastmcp.resources.resource import Resource, ResourceResult
+from fastmcp.resources.template import ResourceTemplate
from fastmcp.server.auth.authorization import (
AuthCheck,
AuthContext,
@@ -48,12 +51,14 @@
class AuthMiddleware(Middleware):
"""Global authorization middleware using callable checks.
- This middleware applies auth checks to all tools on the server.
- It uses the same callable API as tool-level auth checks.
+ This middleware applies auth checks to all components (tools, resources,
+ prompts) on the server. It uses the same callable API as component-level
+ auth checks.
The middleware:
- - Filters tools from list_tools response based on auth checks
- - Checks auth before tool execution in call_tool
+ - Filters tools/resources/prompts from list responses based on auth checks
+ - Checks auth before tool execution, resource read, and prompt render
+ - Skips all auth checks for STDIO transport (no OAuth concept)
Args:
auth: A single auth check function or list of check functions.
@@ -64,10 +69,10 @@ class AuthMiddleware(Middleware):
from fastmcp import FastMCP
from fastmcp.server.auth import require_auth, require_scopes
- # Require any authentication for all tools
+ # Require any authentication for all components
mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
- # Require specific scope for all tools
+ # Require specific scope for all components
mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("api"))])
# Combined checks (AND logic)
@@ -99,7 +104,7 @@ async def on_list_tools(
authorized_tools: list[Tool] = []
for tool in tools:
- ctx = AuthContext(token=token, tool=tool)
+ ctx = AuthContext(token=token, component=tool)
if run_auth_checks(self.auth, ctx):
authorized_tools.append(tool)
@@ -134,7 +139,7 @@ async def on_call_tool(
tool = await fastmcp.fastmcp.get_tool(tool_name)
token = get_access_token()
- ctx = AuthContext(token=token, tool=tool)
+ ctx = AuthContext(token=token, component=tool)
if not run_auth_checks(self.auth, ctx):
raise AuthorizationError(
@@ -142,3 +147,149 @@ async def on_call_tool(
)
return await call_next(context)
+
+ async def on_list_resources(
+ self,
+ context: MiddlewareContext[mt.ListResourcesRequest],
+ call_next: CallNext[mt.ListResourcesRequest, Sequence[Resource]],
+ ) -> Sequence[Resource]:
+ """Filter resources/list response based on auth checks."""
+ resources = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return resources
+
+ token = get_access_token()
+
+ authorized_resources: list[Resource] = []
+ for resource in resources:
+ ctx = AuthContext(token=token, component=resource)
+ if run_auth_checks(self.auth, ctx):
+ authorized_resources.append(resource)
+
+ return authorized_resources
+
+ async def on_read_resource(
+ self,
+ context: MiddlewareContext[mt.ReadResourceRequestParams],
+ call_next: CallNext[mt.ReadResourceRequestParams, ResourceResult],
+ ) -> ResourceResult:
+ """Check auth before resource read."""
+ # STDIO has no auth concept, skip enforcement
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return await call_next(context)
+
+ # Get the resource being read
+ uri = context.message.uri
+ fastmcp = context.fastmcp_context
+ if fastmcp is None:
+ logger.warning(
+ f"AuthMiddleware: fastmcp_context is None for resource '{uri}'. "
+ "Denying access for security."
+ )
+ raise AuthorizationError(
+ f"Authorization failed for resource '{uri}': missing context"
+ )
+
+ resource = await fastmcp.fastmcp.get_resource(str(uri))
+
+ token = get_access_token()
+ ctx = AuthContext(token=token, component=resource)
+
+ if not run_auth_checks(self.auth, ctx):
+ raise AuthorizationError(
+ f"Authorization failed for resource '{uri}': insufficient permissions"
+ )
+
+ return await call_next(context)
+
+ async def on_list_resource_templates(
+ self,
+ context: MiddlewareContext[mt.ListResourceTemplatesRequest],
+ call_next: CallNext[
+ mt.ListResourceTemplatesRequest, Sequence[ResourceTemplate]
+ ],
+ ) -> Sequence[ResourceTemplate]:
+ """Filter resource templates/list response based on auth checks."""
+ templates = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return templates
+
+ token = get_access_token()
+
+ authorized_templates: list[ResourceTemplate] = []
+ for template in templates:
+ ctx = AuthContext(token=token, component=template)
+ if run_auth_checks(self.auth, ctx):
+ authorized_templates.append(template)
+
+ return authorized_templates
+
+ async def on_list_prompts(
+ self,
+ context: MiddlewareContext[mt.ListPromptsRequest],
+ call_next: CallNext[mt.ListPromptsRequest, Sequence[Prompt]],
+ ) -> Sequence[Prompt]:
+ """Filter prompts/list response based on auth checks."""
+ prompts = await call_next(context)
+
+ # STDIO has no auth concept, skip filtering
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return prompts
+
+ token = get_access_token()
+
+ authorized_prompts: list[Prompt] = []
+ for prompt in prompts:
+ ctx = AuthContext(token=token, component=prompt)
+ if run_auth_checks(self.auth, ctx):
+ authorized_prompts.append(prompt)
+
+ return authorized_prompts
+
+ async def on_get_prompt(
+ self,
+ context: MiddlewareContext[mt.GetPromptRequestParams],
+ call_next: CallNext[mt.GetPromptRequestParams, PromptResult],
+ ) -> PromptResult:
+ """Check auth before prompt render."""
+ # STDIO has no auth concept, skip enforcement
+ from fastmcp.server.context import _current_transport
+
+ if _current_transport.get() == "stdio":
+ return await call_next(context)
+
+ # Get the prompt being rendered
+ prompt_name = context.message.name
+ fastmcp = context.fastmcp_context
+ if fastmcp is None:
+ logger.warning(
+ f"AuthMiddleware: fastmcp_context is None for prompt '{prompt_name}'. "
+ "Denying access for security."
+ )
+ raise AuthorizationError(
+ f"Authorization failed for prompt '{prompt_name}': missing context"
+ )
+
+ prompt = await fastmcp.fastmcp.get_prompt(prompt_name)
+
+ token = get_access_token()
+ ctx = AuthContext(token=token, component=prompt)
+
+ if not run_auth_checks(self.auth, ctx):
+ raise AuthorizationError(
+ f"Authorization failed for prompt '{prompt_name}': insufficient permissions"
+ )
+
+ return await call_next(context)
diff --git a/src/fastmcp/server/providers/local_provider.py b/src/fastmcp/server/providers/local_provider.py
index ad74530edb..d29daab6ec 100644
--- a/src/fastmcp/server/providers/local_provider.py
+++ b/src/fastmcp/server/providers/local_provider.py
@@ -555,6 +555,7 @@ def resource(
annotations: Annotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], Resource | ResourceTemplate]:
"""Decorator to register a function as a resource.
@@ -573,6 +574,7 @@ def resource(
annotations: Optional annotations about the resource's behavior
meta: Optional meta information about the resource
task: Optional task configuration for background execution
+ auth: Optional authorization checks for the resource
Returns:
A decorator function.
@@ -605,6 +607,7 @@ def get_weather(city: str) -> str:
annotations=annotations,
meta=meta,
task=supports_task,
+ auth=auth,
)
def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
@@ -635,6 +638,7 @@ def prompt(
enabled: bool = True,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt: ...
@overload
@@ -650,6 +654,7 @@ def prompt(
enabled: bool = True,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
def prompt(
@@ -664,6 +669,7 @@ def prompt(
enabled: bool = True,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionPrompt]
| FunctionPrompt
@@ -688,6 +694,7 @@ def prompt(
enabled: Whether the prompt is enabled (default True). If False, adds to blocklist.
meta: Optional meta information about the prompt
task: Optional task configuration for background execution
+ auth: Optional authorization checks for the prompt
Returns:
The registered FunctionPrompt or a decorator function.
@@ -728,6 +735,7 @@ def register(prompt_obj: FunctionPrompt) -> FunctionPrompt:
tags=tags,
meta=meta,
task=supports_task,
+ auth=auth,
)
# If standalone returned a FunctionPrompt directly (@prompt without parens),
diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py
index f85eec7465..04a137e781 100644
--- a/src/fastmcp/server/server.py
+++ b/src/fastmcp/server/server.py
@@ -55,6 +55,7 @@
import fastmcp
import fastmcp.server
from fastmcp.exceptions import (
+ AuthorizationError,
DisabledError,
FastMCPError,
NotFoundError,
@@ -881,8 +882,12 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
continue
# Check tool-level auth (skip for STDIO)
if not is_stdio and tool.auth is not None:
- ctx = AuthContext(token=token, tool=tool)
- if not run_auth_checks(tool.auth, ctx):
+ ctx = AuthContext(token=token, component=tool)
+ try:
+ if not run_auth_checks(tool.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
continue
all_tools[tool.key] = tool
return list(all_tools.values())
@@ -915,7 +920,7 @@ async def get_tool(self, name: str) -> Tool:
if isinstance(result, Tool) and self._is_component_enabled(result):
# Check tool-level auth (skip for STDIO)
if not is_stdio and result.auth is not None:
- ctx = AuthContext(token=token, tool=result)
+ ctx = AuthContext(token=token, component=result)
if not run_auth_checks(result.auth, ctx):
continue
return result
@@ -955,6 +960,13 @@ async def get_resources(self, *, run_middleware: bool = False) -> list[Resource]
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
all_resources: dict[str, Resource] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -964,24 +976,40 @@ async def get_resources(self, *, run_middleware: bool = False) -> list[Resource]
raise result
continue
for resource in result:
- if (
- self._is_component_enabled(resource)
- and resource.key not in all_resources
- ):
- all_resources[resource.key] = resource
+ if not self._is_component_enabled(resource):
+ continue
+ if resource.key in all_resources:
+ continue
+ # Check resource-level auth (skip for STDIO)
+ if not is_stdio and resource.auth is not None:
+ ctx = AuthContext(token=token, component=resource)
+ try:
+ if not run_auth_checks(resource.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_resources[resource.key] = resource
return list(all_resources.values())
async def get_resource(self, uri: str) -> Resource:
"""Get an enabled resource by URI.
Queries all providers in parallel to find the resource.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_resource(uri) for p in self._providers],
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -990,6 +1018,11 @@ async def get_resource(self, uri: str) -> Resource:
)
continue
if isinstance(result, Resource) and self._is_component_enabled(result):
+ # Check resource-level auth (skip for STDIO)
+ if not is_stdio and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown resource: {uri}")
@@ -1029,6 +1062,13 @@ async def get_resource_templates(
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
all_templates: dict[str, ResourceTemplate] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1040,24 +1080,40 @@ async def get_resource_templates(
raise result
continue
for template in result:
- if (
- self._is_component_enabled(template)
- and template.key not in all_templates
- ):
- all_templates[template.key] = template
+ if not self._is_component_enabled(template):
+ continue
+ if template.key in all_templates:
+ continue
+ # Check template-level auth (skip for STDIO)
+ if not is_stdio and template.auth is not None:
+ ctx = AuthContext(token=token, component=template)
+ try:
+ if not run_auth_checks(template.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_templates[template.key] = template
return list(all_templates.values())
async def get_resource_template(self, uri: str) -> ResourceTemplate:
"""Get an enabled resource template that matches the given URI.
Queries all providers in parallel to find the template.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_resource_template(uri) for p in self._providers],
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -1068,6 +1124,11 @@ async def get_resource_template(self, uri: str) -> ResourceTemplate:
if isinstance(result, ResourceTemplate) and self._is_component_enabled(
result
):
+ # Check template-level auth (skip for STDIO)
+ if not is_stdio and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown resource template: {uri}")
@@ -1105,6 +1166,13 @@ async def get_prompts(self, *, run_middleware: bool = False) -> list[Prompt]:
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
all_prompts: dict[str, Prompt] = {}
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1114,21 +1182,40 @@ async def get_prompts(self, *, run_middleware: bool = False) -> list[Prompt]:
raise result
continue
for prompt in result:
- if self._is_component_enabled(prompt) and prompt.key not in all_prompts:
- all_prompts[prompt.key] = prompt
+ if not self._is_component_enabled(prompt):
+ continue
+ if prompt.key in all_prompts:
+ continue
+ # Check prompt-level auth (skip for STDIO)
+ if not is_stdio and prompt.auth is not None:
+ ctx = AuthContext(token=token, component=prompt)
+ try:
+ if not run_auth_checks(prompt.auth, ctx):
+ continue
+ except AuthorizationError:
+ # Treat auth errors as denials in list operations
+ continue
+ all_prompts[prompt.key] = prompt
return list(all_prompts.values())
async def get_prompt(self, name: str) -> Prompt:
"""Get an enabled prompt by name.
Queries all providers in parallel to find the prompt.
- First provider wins. Returns only if enabled.
+ First provider wins. Returns only if enabled and authorized.
"""
results = await gather(
*[p.get_prompt(name) for p in self._providers],
return_exceptions=True,
)
+ # STDIO has no auth concept, so skip auth checks entirely
+ # Late import to avoid circular import with context.py
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ token = None if is_stdio else get_access_token()
+
for i, result in enumerate(results):
if isinstance(result, BaseException):
if not isinstance(result, NotFoundError):
@@ -1137,6 +1224,11 @@ async def get_prompt(self, name: str) -> Prompt:
)
continue
if isinstance(result, Prompt) and self._is_component_enabled(result):
+ # Check prompt-level auth (skip for STDIO)
+ if not is_stdio and result.auth is not None:
+ ctx = AuthContext(token=token, component=result)
+ if not run_auth_checks(result.auth, ctx):
+ continue
return result
raise NotFoundError(f"Unknown prompt: {name}")
@@ -1926,6 +2018,7 @@ def resource(
annotations: Annotations | dict[str, Any] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], Resource | ResourceTemplate]:
"""Decorator to register a function as a resource.
@@ -1990,6 +2083,7 @@ async def get_weather(city: str) -> str:
annotations=annotations,
meta=meta,
task=task if task is not None else self._support_tasks_by_default,
+ auth=auth,
)
def decorator(fn: AnyFunction) -> Resource | ResourceTemplate:
@@ -2020,6 +2114,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> FunctionPrompt: ...
@overload
@@ -2034,6 +2129,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> Callable[[AnyFunction], FunctionPrompt]: ...
def prompt(
@@ -2047,6 +2143,7 @@ def prompt(
tags: set[str] | None = None,
meta: dict[str, Any] | None = None,
task: bool | TaskConfig | None = None,
+ auth: AuthCheckCallable | list[AuthCheckCallable] | None = None,
) -> (
Callable[[AnyFunction], FunctionPrompt]
| FunctionPrompt
@@ -2130,6 +2227,7 @@ def another_prompt(data: str) -> list[Message]:
tags=tags,
meta=meta,
task=task if task is not None else self._support_tasks_by_default,
+ auth=auth,
)
async def run_stdio_async(
diff --git a/tests/server/auth/test_authorization.py b/tests/server/auth/test_authorization.py
index 051bef00ab..55acd5e7b1 100644
--- a/tests/server/auth/test_authorization.py
+++ b/tests/server/auth/test_authorization.py
@@ -19,7 +19,6 @@
)
from fastmcp.server.middleware import AuthMiddleware
-
# =============================================================================
# Test helpers
# =============================================================================
@@ -50,11 +49,11 @@ def make_tool() -> Mock:
class TestRequireAuth:
def test_returns_true_with_token(self):
- ctx = AuthContext(token=make_token(), tool=make_tool())
+ ctx = AuthContext(token=make_token(), component=make_tool())
assert require_auth(ctx) is True
def test_returns_false_without_token(self):
- ctx = AuthContext(token=None, tool=make_tool())
+ ctx = AuthContext(token=None, component=make_tool())
assert require_auth(ctx) is False
@@ -66,30 +65,30 @@ def test_returns_false_without_token(self):
class TestRequireScopes:
def test_returns_true_with_matching_scope(self):
token = make_token(scopes=["admin"])
- ctx = AuthContext(token=token, tool=make_tool())
+ ctx = AuthContext(token=token, component=make_tool())
check = require_scopes("admin")
assert check(ctx) is True
def test_returns_true_with_all_required_scopes(self):
token = make_token(scopes=["read", "write", "admin"])
- ctx = AuthContext(token=token, tool=make_tool())
+ ctx = AuthContext(token=token, component=make_tool())
check = require_scopes("read", "write")
assert check(ctx) is True
def test_returns_false_with_missing_scope(self):
token = make_token(scopes=["read"])
- ctx = AuthContext(token=token, tool=make_tool())
+ ctx = AuthContext(token=token, component=make_tool())
check = require_scopes("admin")
assert check(ctx) is False
def test_returns_false_with_partial_scopes(self):
token = make_token(scopes=["read"])
- ctx = AuthContext(token=token, tool=make_tool())
+ ctx = AuthContext(token=token, component=make_tool())
check = require_scopes("read", "write")
assert check(ctx) is False
def test_returns_false_without_token(self):
- ctx = AuthContext(token=None, tool=make_tool())
+ ctx = AuthContext(token=None, component=make_tool())
check = require_scopes("admin")
assert check(ctx) is False
@@ -103,14 +102,14 @@ class TestRestrictTag:
def test_allows_access_when_tag_not_present(self):
tool = make_tool()
tool.tags = {"other"}
- ctx = AuthContext(token=None, tool=tool)
+ ctx = AuthContext(token=None, component=tool)
check = restrict_tag("admin", scopes=["admin"])
assert check(ctx) is True
def test_blocks_access_when_tag_present_without_token(self):
tool = make_tool()
tool.tags = {"admin"}
- ctx = AuthContext(token=None, tool=tool)
+ ctx = AuthContext(token=None, component=tool)
check = restrict_tag("admin", scopes=["admin"])
assert check(ctx) is False
@@ -118,7 +117,7 @@ def test_blocks_access_when_tag_present_without_scope(self):
tool = make_tool()
tool.tags = {"admin"}
token = make_token(scopes=["read"])
- ctx = AuthContext(token=token, tool=tool)
+ ctx = AuthContext(token=token, component=tool)
check = restrict_tag("admin", scopes=["admin"])
assert check(ctx) is False
@@ -126,7 +125,7 @@ def test_allows_access_when_tag_present_with_scope(self):
tool = make_tool()
tool.tags = {"admin"}
token = make_token(scopes=["admin"])
- ctx = AuthContext(token=token, tool=tool)
+ ctx = AuthContext(token=token, component=tool)
check = restrict_tag("admin", scopes=["admin"])
assert check(ctx) is True
@@ -138,34 +137,37 @@ def test_allows_access_when_tag_present_with_scope(self):
class TestRunAuthChecks:
def test_single_check_passes(self):
- ctx = AuthContext(token=make_token(), tool=make_tool())
+ ctx = AuthContext(token=make_token(), component=make_tool())
assert run_auth_checks(require_auth, ctx) is True
def test_single_check_fails(self):
- ctx = AuthContext(token=None, tool=make_tool())
+ ctx = AuthContext(token=None, component=make_tool())
assert run_auth_checks(require_auth, ctx) is False
def test_multiple_checks_all_pass(self):
token = make_token(scopes=["admin"])
- ctx = AuthContext(token=token, tool=make_tool())
+ ctx = AuthContext(token=token, component=make_tool())
checks = [require_auth, require_scopes("admin")]
assert run_auth_checks(checks, ctx) is True
def test_multiple_checks_one_fails(self):
token = make_token(scopes=["read"])
- ctx = AuthContext(token=token, tool=make_tool())
+ ctx = AuthContext(token=token, component=make_tool())
checks = [require_auth, require_scopes("admin")]
assert run_auth_checks(checks, ctx) is False
def test_empty_list_passes(self):
- ctx = AuthContext(token=None, tool=make_tool())
+ ctx = AuthContext(token=None, component=make_tool())
assert run_auth_checks([], ctx) is True
def test_custom_lambda_check(self):
token = make_token()
token.claims = {"level": 5}
- ctx = AuthContext(token=token, tool=make_tool())
- check = lambda ctx: ctx.token is not None and ctx.token.claims.get("level", 0) >= 3
+ ctx = AuthContext(token=token, component=make_tool())
+
+ def check(ctx: AuthContext) -> bool:
+ return ctx.token is not None and ctx.token.claims.get("level", 0) >= 3
+
assert run_auth_checks(check, ctx) is True
def test_authorization_error_propagates(self):
@@ -175,7 +177,7 @@ def test_authorization_error_propagates(self):
def custom_auth_check(ctx: AuthContext) -> bool:
raise AuthorizationError("Custom denial reason")
- ctx = AuthContext(token=make_token(), tool=make_tool())
+ ctx = AuthContext(token=make_token(), component=make_tool())
with pytest.raises(AuthorizationError, match="Custom denial reason"):
run_auth_checks(custom_auth_check, ctx)
@@ -185,7 +187,7 @@ def test_generic_exception_is_masked(self):
def buggy_auth_check(ctx: AuthContext) -> bool:
raise ValueError("Unexpected internal error")
- ctx = AuthContext(token=make_token(), tool=make_tool())
+ ctx = AuthContext(token=make_token(), component=make_tool())
# Should return False, not raise the ValueError
assert run_auth_checks(buggy_auth_check, ctx) is False
@@ -207,7 +209,7 @@ def check_3(ctx: AuthContext) -> bool:
call_order.append(3)
return True
- ctx = AuthContext(token=make_token(), tool=make_tool())
+ ctx = AuthContext(token=make_token(), component=make_tool())
with pytest.raises(AuthorizationError, match="Explicit denial"):
run_auth_checks([check_1, check_2, check_3], ctx)
@@ -543,7 +545,6 @@ def protected_tool(x: int) -> str:
class TestAuthMiddlewareCallTool:
async def test_middleware_blocks_call_without_auth(self):
"""AuthMiddleware should raise AuthorizationError on unauthorized call."""
- from fastmcp.exceptions import AuthorizationError
mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
@@ -556,7 +557,10 @@ def my_tool() -> str:
with pytest.raises(Exception) as exc_info:
await client.call_tool("my_tool", {})
# The error message should indicate authorization failure
- assert "authorization" in str(exc_info.value).lower() or "insufficient" in str(exc_info.value).lower()
+ assert (
+ "authorization" in str(exc_info.value).lower()
+ or "insufficient" in str(exc_info.value).lower()
+ )
async def test_middleware_allows_call_with_auth(self):
"""AuthMiddleware should allow tool call with valid token."""
@@ -578,7 +582,6 @@ def my_tool() -> str:
async def test_middleware_blocks_call_with_wrong_scope(self):
"""AuthMiddleware should block calls when scope requirements aren't met."""
- from fastmcp.exceptions import AuthorizationError
mcp = FastMCP(middleware=[AuthMiddleware(auth=require_scopes("admin"))])
@@ -593,6 +596,9 @@ def admin_tool() -> str:
async with Client(mcp) as client:
with pytest.raises(Exception) as exc_info:
await client.call_tool("admin_tool", {})
- assert "authorization" in str(exc_info.value).lower() or "insufficient" in str(exc_info.value).lower()
+ assert (
+ "authorization" in str(exc_info.value).lower()
+ or "insufficient" in str(exc_info.value).lower()
+ )
finally:
auth_context_var.reset(tok)
From b71d0fa20d664875ba3edf6e3b1baab6d0ad33f1 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 18:10:19 -0500
Subject: [PATCH 3/8] Exclude auth from serialization and fix AccessToken
claims
- Add exclude=True to auth field on Tool, Resource, ResourceTemplate, Prompt
- Fix AccessToken claims conversion to handle None values
---
src/fastmcp/prompts/prompt.py | 2 +-
src/fastmcp/resources/resource.py | 2 +-
src/fastmcp/resources/template.py | 4 +++-
src/fastmcp/server/dependencies.py | 2 +-
src/fastmcp/tools/tool.py | 2 +-
5 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/src/fastmcp/prompts/prompt.py b/src/fastmcp/prompts/prompt.py
index 7fc691dd11..0b27bac384 100644
--- a/src/fastmcp/prompts/prompt.py
+++ b/src/fastmcp/prompts/prompt.py
@@ -202,7 +202,7 @@ class Prompt(FastMCPComponent):
default=None, description="Arguments that can be passed to the prompt"
)
auth: AuthCheckCallable | list[AuthCheckCallable] | None = Field(
- default=None, description="Authorization checks for this prompt"
+ default=None, description="Authorization checks for this prompt", exclude=True
)
def to_mcp_prompt(
diff --git a/src/fastmcp/resources/resource.py b/src/fastmcp/resources/resource.py
index ca81676908..e39046338c 100644
--- a/src/fastmcp/resources/resource.py
+++ b/src/fastmcp/resources/resource.py
@@ -232,7 +232,7 @@ class Resource(FastMCPComponent):
] = None
auth: Annotated[
AuthCheckCallable | list[AuthCheckCallable] | None,
- Field(description="Authorization checks for this resource"),
+ Field(description="Authorization checks for this resource", exclude=True),
] = None
@staticmethod
diff --git a/src/fastmcp/resources/template.py b/src/fastmcp/resources/template.py
index be1597742a..fb3b0eae88 100644
--- a/src/fastmcp/resources/template.py
+++ b/src/fastmcp/resources/template.py
@@ -116,7 +116,9 @@ class ResourceTemplate(FastMCPComponent):
default=None, description="Optional annotations about the resource's behavior"
)
auth: AuthCheckCallable | list[AuthCheckCallable] | None = Field(
- default=None, description="Authorization checks for this resource template"
+ default=None,
+ description="Authorization checks for this resource template",
+ exclude=True,
)
def __repr__(self) -> str:
diff --git a/src/fastmcp/server/dependencies.py b/src/fastmcp/server/dependencies.py
index d2698670c0..28987431c8 100644
--- a/src/fastmcp/server/dependencies.py
+++ b/src/fastmcp/server/dependencies.py
@@ -417,7 +417,7 @@ def get_access_token() -> AccessToken | None:
# Optional fields
expires_at=access_token_as_dict.get("expires_at"),
resource=access_token_as_dict.get("resource"),
- claims=access_token_as_dict.get("claims"),
+ claims=access_token_as_dict.get("claims") or {},
)
except Exception as e:
raise TypeError(
diff --git a/src/fastmcp/tools/tool.py b/src/fastmcp/tools/tool.py
index 1c6db5e9a3..c73555dafc 100644
--- a/src/fastmcp/tools/tool.py
+++ b/src/fastmcp/tools/tool.py
@@ -172,7 +172,7 @@ class Tool(FastMCPComponent):
] = None
auth: Annotated[
AuthCheckCallable | list[AuthCheckCallable] | None,
- Field(description="Authorization checks for this tool"),
+ Field(description="Authorization checks for this tool", exclude=True),
] = None
@model_validator(mode="after")
From 9335be4d05ca027881cfcba80b81a6c1713a4f1d Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 18:11:31 -0500
Subject: [PATCH 4/8] Fix template auth bypass and update doc comment
- Catch NotFoundError in on_read_resource and fall back to get_resource_template
- Fix misleading comment about auth requirements for untagged tools
---
docs/servers/auth/authorization.mdx | 2 +-
src/fastmcp/server/middleware/authorization.py | 10 +++++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/docs/servers/auth/authorization.mdx b/docs/servers/auth/authorization.mdx
index e8f0929c35..a62fb87393 100644
--- a/docs/servers/auth/authorization.mdx
+++ b/docs/servers/auth/authorization.mdx
@@ -283,7 +283,7 @@ def update_record(id: str, data: str) -> str:
@mcp.tool
def read_record(id: str) -> str:
- """No tag restrictions, accessible to all authenticated users."""
+ """No tag restrictions, accessible to all (no auth required by this middleware)."""
return f"Record {id}"
```
diff --git a/src/fastmcp/server/middleware/authorization.py b/src/fastmcp/server/middleware/authorization.py
index 5e2fa09d12..f9f69ecd14 100644
--- a/src/fastmcp/server/middleware/authorization.py
+++ b/src/fastmcp/server/middleware/authorization.py
@@ -28,7 +28,7 @@
import mcp.types as mt
-from fastmcp.exceptions import AuthorizationError
+from fastmcp.exceptions import AuthorizationError, NotFoundError
from fastmcp.prompts.prompt import Prompt, PromptResult
from fastmcp.resources.resource import Resource, ResourceResult
from fastmcp.resources.template import ResourceTemplate
@@ -196,10 +196,14 @@ async def on_read_resource(
f"Authorization failed for resource '{uri}': missing context"
)
- resource = await fastmcp.fastmcp.get_resource(str(uri))
+ # Try concrete resource first, then template (for template-backed URIs)
+ try:
+ component = await fastmcp.fastmcp.get_resource(str(uri))
+ except NotFoundError:
+ component = await fastmcp.fastmcp.get_resource_template(str(uri))
token = get_access_token()
- ctx = AuthContext(token=token, component=resource)
+ ctx = AuthContext(token=token, component=component)
if not run_auth_checks(self.auth, ctx):
raise AuthorizationError(
From eca018125148298ca42f590ab6e1514da4e584e0 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 18:15:26 -0500
Subject: [PATCH 5/8] Add authorization feature to v3 release notes
---
docs/development/v3-notes/v3-features.mdx | 53 +++++++++++++++++++++++
1 file changed, 53 insertions(+)
diff --git a/docs/development/v3-notes/v3-features.mdx b/docs/development/v3-notes/v3-features.mdx
index 95896a8e4e..7e49122a39 100644
--- a/docs/development/v3-notes/v3-features.mdx
+++ b/docs/development/v3-notes/v3-features.mdx
@@ -267,6 +267,59 @@ fastmcp dev server.py # Includes --reload by default
---
+## Component Authorization
+
+v3.0 introduces callable-based authorization for tools, resources, and prompts (`src/fastmcp/server/auth/authorization.py`).
+
+**Component-level auth**:
+
+```python
+from fastmcp import FastMCP
+from fastmcp.server.auth import require_auth, require_scopes
+
+mcp = FastMCP()
+
+@mcp.tool(auth=require_auth)
+def protected_tool(): ...
+
+@mcp.resource("data://secret", auth=require_scopes("read"))
+def secret_data(): ...
+
+@mcp.prompt(auth=require_scopes("admin"))
+def admin_prompt(): ...
+```
+
+**Server-wide auth via middleware**:
+
+```python
+from fastmcp.server.middleware import AuthMiddleware
+from fastmcp.server.auth import require_auth, restrict_tag
+
+# Require auth for all components
+mcp = FastMCP(middleware=[AuthMiddleware(auth=require_auth)])
+
+# Tag-based restrictions
+mcp = FastMCP(middleware=[
+ AuthMiddleware(auth=restrict_tag("admin", scopes=["admin"]))
+])
+```
+
+Built-in checks:
+- `require_auth`: Requires any valid token
+- `require_scopes(*scopes)`: Requires specific OAuth scopes
+- `restrict_tag(tag, scopes)`: Requires scopes only for tagged components
+
+Custom checks receive `AuthContext` with `token` and `component`:
+
+```python
+def custom_check(ctx: AuthContext) -> bool:
+ return ctx.token is not None and "admin" in ctx.token.scopes
+```
+
+STDIO transport bypasses all auth checks (no OAuth concept).
+
+---
+
## Deprecated Features
These emit deprecation warnings but continue to work.
From 24edfdf9c8de9c126bfdb7729d9b07045c5194ba Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 18:21:11 -0500
Subject: [PATCH 6/8] Address CodeRabbit nitpicks
- Propagate auth from template to ephemeral Resource in create_resource()
- Extract _get_auth_context() helper to reduce duplication in server.py
---
src/fastmcp/resources/template.py | 1 +
src/fastmcp/server/server.py | 98 +++++++++++++------------------
2 files changed, 42 insertions(+), 57 deletions(-)
diff --git a/src/fastmcp/resources/template.py b/src/fastmcp/resources/template.py
index fb3b0eae88..97a26c9a5a 100644
--- a/src/fastmcp/resources/template.py
+++ b/src/fastmcp/resources/template.py
@@ -378,6 +378,7 @@ async def resource_read_fn() -> str | bytes | ResourceResult:
mime_type=self.mime_type,
tags=self.tags,
task=self.task_config,
+ auth=self.auth,
)
async def read(self, arguments: dict[str, Any]) -> str | bytes | ResourceResult:
diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py
index 04a137e781..d160fbbdba 100644
--- a/src/fastmcp/server/server.py
+++ b/src/fastmcp/server/server.py
@@ -158,6 +158,23 @@ def _resolve_on_duplicate(
]
+def _get_auth_context() -> tuple[bool, Any]:
+ """Get auth context for the current request.
+
+ Returns a tuple of (skip_auth, token) where:
+ - skip_auth=True means auth checks should be skipped (STDIO transport)
+ - token is the access token for HTTP transports (may be None if unauthenticated)
+
+ Uses late import to avoid circular import with context.py.
+ """
+ from fastmcp.server.context import _current_transport
+
+ is_stdio = _current_transport.get() == "stdio"
+ if is_stdio:
+ return (True, None)
+ return (False, get_access_token())
+
+
@asynccontextmanager
async def default_lifespan(server: FastMCP[LifespanResultT]) -> AsyncIterator[Any]:
"""Default lifespan context manager that does nothing.
@@ -861,13 +878,8 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
return_exceptions=True,
)
- # Get token once for auth filtering (only applies to HTTP transports)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
all_tools: dict[str, Tool] = {}
for i, result in enumerate(results):
@@ -881,7 +893,7 @@ async def get_tools(self, *, run_middleware: bool = False) -> list[Tool]:
if not self._is_component_enabled(tool) or tool.key in all_tools:
continue
# Check tool-level auth (skip for STDIO)
- if not is_stdio and tool.auth is not None:
+ if not skip_auth and tool.auth is not None:
ctx = AuthContext(token=token, component=tool)
try:
if not run_auth_checks(tool.auth, ctx):
@@ -903,12 +915,8 @@ async def get_tool(self, name: str) -> Tool:
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -919,7 +927,7 @@ async def get_tool(self, name: str) -> Tool:
continue
if isinstance(result, Tool) and self._is_component_enabled(result):
# Check tool-level auth (skip for STDIO)
- if not is_stdio and result.auth is not None:
+ if not skip_auth and result.auth is not None:
ctx = AuthContext(token=token, component=result)
if not run_auth_checks(result.auth, ctx):
continue
@@ -960,12 +968,8 @@ async def get_resources(self, *, run_middleware: bool = False) -> list[Resource]
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
all_resources: dict[str, Resource] = {}
for i, result in enumerate(results):
@@ -981,7 +985,7 @@ async def get_resources(self, *, run_middleware: bool = False) -> list[Resource]
if resource.key in all_resources:
continue
# Check resource-level auth (skip for STDIO)
- if not is_stdio and resource.auth is not None:
+ if not skip_auth and resource.auth is not None:
ctx = AuthContext(token=token, component=resource)
try:
if not run_auth_checks(resource.auth, ctx):
@@ -1003,12 +1007,8 @@ async def get_resource(self, uri: str) -> Resource:
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1019,7 +1019,7 @@ async def get_resource(self, uri: str) -> Resource:
continue
if isinstance(result, Resource) and self._is_component_enabled(result):
# Check resource-level auth (skip for STDIO)
- if not is_stdio and result.auth is not None:
+ if not skip_auth and result.auth is not None:
ctx = AuthContext(token=token, component=result)
if not run_auth_checks(result.auth, ctx):
continue
@@ -1062,12 +1062,8 @@ async def get_resource_templates(
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
all_templates: dict[str, ResourceTemplate] = {}
for i, result in enumerate(results):
@@ -1085,7 +1081,7 @@ async def get_resource_templates(
if template.key in all_templates:
continue
# Check template-level auth (skip for STDIO)
- if not is_stdio and template.auth is not None:
+ if not skip_auth and template.auth is not None:
ctx = AuthContext(token=token, component=template)
try:
if not run_auth_checks(template.auth, ctx):
@@ -1107,12 +1103,8 @@ async def get_resource_template(self, uri: str) -> ResourceTemplate:
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1125,7 +1117,7 @@ async def get_resource_template(self, uri: str) -> ResourceTemplate:
result
):
# Check template-level auth (skip for STDIO)
- if not is_stdio and result.auth is not None:
+ if not skip_auth and result.auth is not None:
ctx = AuthContext(token=token, component=result)
if not run_auth_checks(result.auth, ctx):
continue
@@ -1166,12 +1158,8 @@ async def get_prompts(self, *, run_middleware: bool = False) -> list[Prompt]:
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
all_prompts: dict[str, Prompt] = {}
for i, result in enumerate(results):
@@ -1187,7 +1175,7 @@ async def get_prompts(self, *, run_middleware: bool = False) -> list[Prompt]:
if prompt.key in all_prompts:
continue
# Check prompt-level auth (skip for STDIO)
- if not is_stdio and prompt.auth is not None:
+ if not skip_auth and prompt.auth is not None:
ctx = AuthContext(token=token, component=prompt)
try:
if not run_auth_checks(prompt.auth, ctx):
@@ -1209,12 +1197,8 @@ async def get_prompt(self, name: str) -> Prompt:
return_exceptions=True,
)
- # STDIO has no auth concept, so skip auth checks entirely
- # Late import to avoid circular import with context.py
- from fastmcp.server.context import _current_transport
-
- is_stdio = _current_transport.get() == "stdio"
- token = None if is_stdio else get_access_token()
+ # Get auth context (skip_auth=True for STDIO which has no auth concept)
+ skip_auth, token = _get_auth_context()
for i, result in enumerate(results):
if isinstance(result, BaseException):
@@ -1225,7 +1209,7 @@ async def get_prompt(self, name: str) -> Prompt:
continue
if isinstance(result, Prompt) and self._is_component_enabled(result):
# Check prompt-level auth (skip for STDIO)
- if not is_stdio and result.auth is not None:
+ if not skip_auth and result.auth is not None:
ctx = AuthContext(token=token, component=result)
if not run_auth_checks(result.auth, ctx):
continue
From 5dc1bd3cf537c9bf2c41302595cfea1075d0d933 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 19:38:20 -0500
Subject: [PATCH 7/8] Add authorization to docs.json and icon to lifespan
---
docs/docs.json | 1 +
docs/servers/lifespan.mdx | 1 +
2 files changed, 2 insertions(+)
diff --git a/docs/docs.json b/docs/docs.json
index 8105d89245..623551c6f0 100644
--- a/docs/docs.json
+++ b/docs/docs.json
@@ -138,6 +138,7 @@
"pages": [
"servers/auth/authentication",
"servers/auth/token-verification",
+ "servers/auth/authorization",
"servers/auth/remote-oauth",
"servers/auth/oauth-proxy",
"servers/auth/oidc-proxy",
diff --git a/docs/servers/lifespan.mdx b/docs/servers/lifespan.mdx
index 9171ed8929..442c38c03b 100644
--- a/docs/servers/lifespan.mdx
+++ b/docs/servers/lifespan.mdx
@@ -1,6 +1,7 @@
---
title: Lifespans
description: Server-level setup and teardown with composable lifespans
+icon: heart-pulse
---
Lifespans let you run code once when the server starts and clean up when it stops. Unlike per-session handlers, lifespans run exactly once regardless of how many clients connect.
From f096301a1dd4b7bea8df9bc44324ad936131d95b Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Mon, 12 Jan 2026 19:47:59 -0500
Subject: [PATCH 8/8] Update docs: authorization, lifespan, context transport
- Move authorization.mdx to servers/ and add to Features group
- Reorganize authorization doc structure (component-level, server-level, reference)
- Add tag: NEW to lifespan and authorization
- Add VersionBadge 3.0.0 to context transport section
---
docs/docs.json | 6 +-
docs/servers/{auth => }/authorization.mdx | 118 ++++++++--------------
docs/servers/context.mdx | 2 +
docs/servers/lifespan.mdx | 1 +
4 files changed, 48 insertions(+), 79 deletions(-)
rename docs/servers/{auth => }/authorization.mdx (69%)
diff --git a/docs/docs.json b/docs/docs.json
index 623551c6f0..2649240469 100644
--- a/docs/docs.json
+++ b/docs/docs.json
@@ -120,7 +120,7 @@
"group": "Features",
"icon": "stars",
"pages": [
- "servers/tasks",
+ "servers/authorization",
"servers/context",
"servers/elicitation",
"servers/icons",
@@ -129,7 +129,8 @@
"servers/middleware",
"servers/progress",
"servers/sampling",
- "servers/storage-backends"
+ "servers/storage-backends",
+ "servers/tasks"
]
},
{
@@ -138,7 +139,6 @@
"pages": [
"servers/auth/authentication",
"servers/auth/token-verification",
- "servers/auth/authorization",
"servers/auth/remote-oauth",
"servers/auth/oauth-proxy",
"servers/auth/oidc-proxy",
diff --git a/docs/servers/auth/authorization.mdx b/docs/servers/authorization.mdx
similarity index 69%
rename from docs/servers/auth/authorization.mdx
rename to docs/servers/authorization.mdx
index a62fb87393..ac65f21cb8 100644
--- a/docs/servers/auth/authorization.mdx
+++ b/docs/servers/authorization.mdx
@@ -1,32 +1,33 @@
---
title: Authorization
sidebarTitle: Authorization
-description: Control access to tools using callable-based authorization checks that filter visibility and enforce permissions.
+description: Control access to components using callable-based authorization checks that filter visibility and enforce permissions.
icon: shield-halved
+tag: NEW
---
import { VersionBadge } from "/snippets/version-badge.mdx"
-Authorization controls what authenticated users can do with your FastMCP server. While [authentication](/servers/auth/authentication) verifies identity (who you are), authorization determines access (what you can do). FastMCP provides a callable-based authorization system that works at both the tool level and globally via middleware.
+Authorization controls what authenticated users can do with your FastMCP server. While [authentication](/servers/auth/authentication) verifies identity (who you are), authorization determines access (what you can do). FastMCP provides a callable-based authorization system that works at both the component level and globally via middleware.
The authorization model centers on a simple concept: callable functions that receive context about the current request and return `True` to allow access or `False` to deny it. Multiple checks combine with AND logic, meaning all checks must pass for access to be granted.
-Authorization relies on OAuth tokens which are only available with HTTP transports (SSE, Streamable HTTP). In STDIO mode, there's no OAuth mechanism, so `get_access_token()` returns `None`. Tools with auth checks like `require_auth` will be hidden from STDIO clients.
+Authorization relies on OAuth tokens which are only available with HTTP transports (SSE, Streamable HTTP). In STDIO mode, there's no OAuth mechanism, so `get_access_token()` returns `None` and all auth checks are skipped.
## Auth Checks
-An auth check is any callable that accepts an `AuthContext` and returns a boolean. The `AuthContext` provides access to the current token (if any) and the tool being accessed.
+An auth check is any callable that accepts an `AuthContext` and returns a boolean. The `AuthContext` provides access to the current token (if any) and the component being accessed.
```python
from fastmcp.server.auth import AuthContext
def my_custom_check(ctx: AuthContext) -> bool:
# ctx.token is AccessToken | None
- # ctx.tool is the Tool being accessed
+ # ctx.component is the Tool, Resource, or Prompt being accessed
return ctx.token is not None and "special" in ctx.token.scopes
```
@@ -71,7 +72,7 @@ def read_write_operation() -> str:
### restrict_tag
-Tag-based restrictions apply scope requirements conditionally. If a tool has the specified tag, the token must have the required scopes. Tools without the tag are unaffected.
+Tag-based restrictions apply scope requirements conditionally. If a component has the specified tag, the token must have the required scopes. Components without the tag are unaffected.
```python
from fastmcp import FastMCP
@@ -96,7 +97,7 @@ def public_tool() -> str:
return "Anyone can access"
```
-## Combining Checks
+### Combining Checks
Multiple auth checks can be combined by passing a list. All checks must pass for authorization to succeed (AND logic).
@@ -112,9 +113,9 @@ def secure_admin_action() -> str:
return "Secure admin action"
```
-## Custom Auth Checks
+### Custom Auth Checks
-Any callable that accepts `AuthContext` and returns `bool` can serve as an auth check. This enables authorization logic based on token claims, tool metadata, or external systems.
+Any callable that accepts `AuthContext` and returns `bool` can serve as an auth check. This enables authorization logic based on token claims, component metadata, or external systems.
```python
from fastmcp import FastMCP
@@ -148,84 +149,57 @@ def advanced_feature() -> str:
return "Advanced feature"
```
-Lambda functions work for simple inline checks.
-
-```python
-from fastmcp import FastMCP
-
-mcp = FastMCP("Lambda Auth Server")
-
-@mcp.tool(auth=lambda ctx: ctx.token and ctx.token.claims.get("level", 0) >= 5)
-def level_restricted_tool() -> str:
- """Requires level 5+ in token claims."""
- return "High level access granted"
-```
-
-### Exception Handling in Auth Checks
-
Auth checks can raise exceptions for explicit denial with custom messages:
- **`AuthorizationError`**: Propagates with its custom message, useful for explaining why access was denied
- **Other exceptions**: Masked for security (logged internally, treated as denial)
```python
-from fastmcp import FastMCP
from fastmcp.server.auth import AuthContext
from fastmcp.exceptions import AuthorizationError
-mcp = FastMCP("Exception Auth Server")
-
def require_verified_email(ctx: AuthContext) -> bool:
"""Require verified email with explicit denial message."""
if ctx.token is None:
raise AuthorizationError("Authentication required")
if not ctx.token.claims.get("email_verified"):
- raise AuthorizationError("Email verification required to access this tool")
+ raise AuthorizationError("Email verification required")
return True
-
-@mcp.tool(auth=require_verified_email)
-def sensitive_operation() -> str:
- """Only accessible to users with verified email."""
- return "Operation completed"
```
-This pattern is useful when you want to provide actionable feedback to clients about why authorization failed.
-
-## Tool-Level Authorization
+## Component-Level Authorization
-The `auth` parameter on `@mcp.tool` controls visibility of individual tools. When a tool has auth checks that fail for the current request, it is hidden from `list_tools` responses. The tool simply doesn't appear in the available tools list.
+The `auth` parameter on decorators controls visibility of individual components. When auth checks fail for the current request, the component is hidden from list responses—it simply doesn't appear.
```python
from fastmcp import FastMCP
from fastmcp.server.auth import require_auth, require_scopes
-mcp = FastMCP("Tool Auth Server")
-
-@mcp.tool
-def public_tool() -> str:
- """Visible to everyone."""
- return "Public"
+mcp = FastMCP("Component Auth Server")
@mcp.tool(auth=require_auth)
def authenticated_tool() -> str:
"""Only visible to authenticated users."""
return "Authenticated"
-@mcp.tool(auth=require_scopes("admin"))
-def admin_tool() -> str:
+@mcp.resource("secret://data", auth=require_scopes("read"))
+def secret_resource() -> str:
+ """Only visible to users with 'read' scope."""
+ return "Secret data"
+
+@mcp.prompt(auth=require_scopes("admin"))
+def admin_prompt() -> str:
"""Only visible to users with 'admin' scope."""
- return "Admin"
+ return "Admin prompt content"
```
-An unauthenticated client sees only `public_tool`. An authenticated client without the "admin" scope sees `public_tool` and `authenticated_tool`. An authenticated client with the "admin" scope sees all three tools.
-
-Tool-level `auth` only controls visibility in `list_tools`. It does not block direct tool calls. If a client knows a tool name, they can attempt to call it directly. Use `AuthMiddleware` to enforce authorization on tool execution.
+Component-level `auth` only controls visibility in list operations. It does not block direct access. Use `AuthMiddleware` to enforce authorization on execution.
-## AuthMiddleware
+## Server-Level Authorization
-For server-wide authorization enforcement, use `AuthMiddleware`. This middleware applies auth checks globally to all tools and provides two critical functions: filtering `list_tools` responses and enforcing authorization on `call_tool` requests.
+For server-wide authorization enforcement, use `AuthMiddleware`. This middleware applies auth checks globally to all components—filtering list responses and blocking unauthorized execution.
```python
from fastmcp import FastMCP
@@ -245,18 +219,16 @@ def any_tool() -> str:
### Filtering vs Enforcement
-Understanding the difference between tool-level auth and `AuthMiddleware` is important for building secure servers:
-
-| Behavior | Tool-level `auth` | `AuthMiddleware` |
-|----------|-------------------|------------------|
-| Filters `list_tools` | Yes | Yes |
-| Blocks `call_tool` | No | Yes (raises `AuthorizationError`) |
+| Behavior | Component-level `auth` | `AuthMiddleware` |
+|----------|------------------------|------------------|
+| Filters list responses | Yes | Yes |
+| Blocks execution | No | Yes (raises `AuthorizationError`) |
-Tool-level auth is useful for hiding tools from unauthorized users while still allowing advanced clients to call them if needed. `AuthMiddleware` provides complete enforcement by raising `AuthorizationError` when unauthorized requests attempt to execute tools.
+Component-level auth is useful for hiding components from unauthorized users while still allowing advanced clients to access them directly. `AuthMiddleware` provides complete enforcement by raising `AuthorizationError` when unauthorized requests attempt execution.
### Tag-Based Global Authorization
-A common pattern uses `restrict_tag` with `AuthMiddleware` to apply scope requirements based on tool tags.
+A common pattern uses `restrict_tag` with `AuthMiddleware` to apply scope requirements based on component tags.
```python
from fastmcp import FastMCP
@@ -283,7 +255,7 @@ def update_record(id: str, data: str) -> str:
@mcp.tool
def read_record(id: str) -> str:
- """No tag restrictions, accessible to all (no auth required by this middleware)."""
+ """No tag restrictions, accessible to all."""
return f"Record {id}"
```
@@ -323,7 +295,9 @@ def user_dashboard() -> dict:
}
```
-## AccessToken
+## Reference
+
+### AccessToken
The `AccessToken` object contains information extracted from the OAuth token.
@@ -335,48 +309,40 @@ The `AccessToken` object contains information extracted from the OAuth token.
| `expires_at` | `datetime \| None` | Token expiration time |
| `claims` | `dict[str, Any]` | All JWT claims or custom token data |
-The `claims` dictionary contains the full decoded token payload for JWT tokens, enabling authorization decisions based on any claim present in the token.
-
-## AuthContext
+### AuthContext
The `AuthContext` dataclass is passed to all auth check functions.
| Property | Type | Description |
|----------|------|-------------|
| `token` | `AccessToken \| None` | Current access token, or `None` if unauthenticated |
-| `tool` | `Tool` | The tool being accessed |
+| `component` | `Tool \| Resource \| Prompt` | The component being accessed |
-Access to the tool object enables authorization decisions based on tool metadata like tags, name, or custom properties.
+Access to the component object enables authorization decisions based on metadata like tags, name, or custom properties.
```python
from fastmcp.server.auth import AuthContext
def require_matching_tag(ctx: AuthContext) -> bool:
- """Require a scope matching each of the tool's tags."""
+ """Require a scope matching each of the component's tags."""
if ctx.token is None:
return False
user_scopes = set(ctx.token.scopes)
- return ctx.tool.tags.issubset(user_scopes)
+ return ctx.component.tags.issubset(user_scopes)
```
-## Type Reference
-
-All authorization types are exported from `fastmcp.server.auth`:
+### Imports
```python
from fastmcp.server.auth import (
AccessToken, # Token with .token, .client_id, .scopes, .expires_at, .claims
- AuthContext, # Context with .token, .tool
+ AuthContext, # Context with .token, .component
AuthCheck, # Type alias: Callable[[AuthContext], bool]
require_auth, # Built-in: requires any valid token
require_scopes, # Built-in: requires specific scopes
restrict_tag, # Built-in: tag-based scope requirements
run_auth_checks, # Utility: run checks with AND logic
)
-```
-
-The `AuthMiddleware` is exported from `fastmcp.server.middleware`:
-```python
from fastmcp.server.middleware import AuthMiddleware
```
diff --git a/docs/servers/context.mdx b/docs/servers/context.mdx
index cf365cd28a..3dc1b85e71 100644
--- a/docs/servers/context.mdx
+++ b/docs/servers/context.mdx
@@ -302,6 +302,8 @@ async def my_tool(ctx: Context) -> None:
### Transport
+
+
The `ctx.transport` property indicates which transport is being used to run the server. This is useful when your tool needs to behave differently depending on whether the server is running over STDIO, SSE, or Streamable HTTP. For example, you might want to return shorter responses over STDIO or adjust timeout behavior based on transport characteristics.
The transport type is set once when the server starts and remains constant for the server's lifetime. It returns `None` when called outside of a server context (for example, in unit tests or when running code outside of an MCP request).
diff --git a/docs/servers/lifespan.mdx b/docs/servers/lifespan.mdx
index 442c38c03b..45e1f775e9 100644
--- a/docs/servers/lifespan.mdx
+++ b/docs/servers/lifespan.mdx
@@ -2,6 +2,7 @@
title: Lifespans
description: Server-level setup and teardown with composable lifespans
icon: heart-pulse
+tag: NEW
---
Lifespans let you run code once when the server starts and clean up when it stops. Unlike per-session handlers, lifespans run exactly once regardless of how many clients connect.