Skip to content
5 changes: 4 additions & 1 deletion src/fastmcp/client/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ def infer_transport(

# the transport is an http(s) URL
elif isinstance(transport, AnyUrl | str) and str(transport).startswith("http"):
return SSETransport(url=transport)
if str(transport).endswith("/sse"):
return SSETransport(url=transport)
else:
return StreamableHttpTransport(url=transport)

# the transport is a websocket URL
elif isinstance(transport, AnyUrl | str) and str(transport).startswith("ws"):
Expand Down
227 changes: 182 additions & 45 deletions src/fastmcp/server/http.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

from collections.abc import Generator
from contextlib import contextmanager
from collections.abc import AsyncGenerator, Callable, Generator
from contextlib import asynccontextmanager, contextmanager
from contextvars import ContextVar
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

from mcp.server.auth.middleware.auth_context import AuthContextMiddleware
from mcp.server.auth.middleware.bearer_auth import (
Expand All @@ -22,10 +22,12 @@
from starlette.routing import Mount, Route
from starlette.types import Receive, Scope, Send

# This import is vendored until it is finalized in the upstream SDK
from fastmcp.server.streamable_http_manager import StreamableHTTPSessionManager
from fastmcp.utilities.logging import get_logger

if TYPE_CHECKING:
from fastmcp import FastMCP
from fastmcp.server.server import FastMCP

logger = get_logger(__name__)

Expand Down Expand Up @@ -53,10 +55,92 @@ def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
with set_http_request(Request(scope)):
if scope["type"] == "http":
with set_http_request(Request(scope)):
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)


def setup_auth_middleware_and_routes(
auth_server_provider: OAuthAuthorizationServerProvider | None,
auth_settings: AuthSettings | None,
) -> tuple[list[Middleware], list[Route | Mount], list[str]]:
"""Set up authentication middleware and routes if auth is enabled.

Args:
auth_server_provider: The OAuth authorization server provider
auth_settings: The auth settings

Returns:
Tuple of (middleware, auth_routes, required_scopes)
"""
middleware: list[Middleware] = []
auth_routes: list[Route | Mount] = []
required_scopes: list[str] = []

if auth_server_provider:
if not auth_settings:
raise ValueError(
"auth_settings must be provided when auth_server_provider is specified"
)

middleware = [
Middleware(
AuthenticationMiddleware,
backend=BearerAuthBackend(provider=auth_server_provider),
),
Middleware(AuthContextMiddleware),
]

required_scopes = auth_settings.required_scopes or []

auth_routes.extend(
create_auth_routes(
provider=auth_server_provider,
issuer_url=auth_settings.issuer_url,
service_documentation_url=auth_settings.service_documentation_url,
client_registration_options=auth_settings.client_registration_options,
revocation_options=auth_settings.revocation_options,
)
)

return middleware, auth_routes, required_scopes


def create_base_app(
routes: list[Route | Mount],
middleware: list[Middleware],
debug: bool,
lifespan: Callable | None = None,
) -> Starlette:
"""Create a base Starlette app with common middleware and routes.

Args:
routes: List of routes to include in the app
middleware: List of middleware to include in the app
debug: Whether to enable debug mode
lifespan: Optional lifespan manager for the app

Returns:
A Starlette application
"""
# Always add RequestContextMiddleware as the outermost middleware
middleware.append(Middleware(RequestContextMiddleware))

# Create the app
app_kwargs = {
"debug": debug,
"routes": routes,
"middleware": middleware,
}

if lifespan:
app_kwargs["lifespan"] = lifespan

return Starlette(**app_kwargs)


def create_sse_app(
server: FastMCP,
message_path: str,
Expand Down Expand Up @@ -93,42 +177,17 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send) -> Response:
)
return Response()

# Configure routes and middleware
routes: list[Route | Mount] = []
middleware: list[Middleware] = []

# Handle authentication configuration
if auth_server_provider:
# Ensure auth settings are provided when auth provider is present
if not auth_settings:
raise ValueError(
"auth_settings must be provided when auth_server_provider is specified"
)

# Configure auth middleware
middleware = [
Middleware(
AuthenticationMiddleware,
backend=BearerAuthBackend(provider=auth_server_provider),
),
Middleware(AuthContextMiddleware),
]
# Get auth middleware and routes
middleware, auth_routes, required_scopes = setup_auth_middleware_and_routes(
auth_server_provider, auth_settings
)

# Get required scopes for authentication
required_scopes = auth_settings.required_scopes or []

# Add auth routes
routes.extend(
create_auth_routes(
provider=auth_server_provider,
issuer_url=auth_settings.issuer_url,
service_documentation_url=auth_settings.service_documentation_url,
client_registration_options=auth_settings.client_registration_options,
revocation_options=auth_settings.revocation_options,
)
)
# Initialize routes with auth routes
routes: list[Route | Mount] = auth_routes.copy()

# Add authenticated routes
# Add SSE routes with or without auth
if auth_server_provider:
# Auth is enabled, wrap endpoints with RequireAuthMiddleware
routes.append(
Route(
sse_path,
Expand All @@ -143,7 +202,7 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send) -> Response:
)
)
else:
# No authentication required
# No auth required
async def sse_endpoint(request: Request) -> Response:
return await handle_sse(request.scope, request.receive, request._send) # type: ignore[reportPrivateUsage]

Expand All @@ -163,10 +222,88 @@ async def sse_endpoint(request: Request) -> Response:

# Add custom routes with lowest precedence
if additional_routes:
routes.extend(additional_routes)
routes.extend(cast(list[Route | Mount], additional_routes))

# Add RequestContextMiddleware as the outermost middleware
middleware.append(Middleware(RequestContextMiddleware))
# Create and return the app
return create_base_app(routes, middleware, debug)


def create_streamable_http_app(
server: FastMCP,
streamable_http_path: str,
event_store: None = None,
auth_server_provider: OAuthAuthorizationServerProvider | None = None,
auth_settings: AuthSettings | None = None,
json_response: bool = False,
stateless_http: bool = False,
debug: bool = False,
additional_routes: list[Route] | list[Mount] | list[Route | Mount] | None = None,
) -> Starlette:
"""Return an instance of the StreamableHTTP server app.

Args:
server: The FastMCP server instance
streamable_http_path: Path for StreamableHTTP connections
event_store: Optional event store for session management
auth_server_provider: Optional auth provider
auth_settings: Optional auth settings
json_response: Whether to use JSON response format
stateless_http: Whether to use stateless mode (new transport per request)
debug: Whether to enable debug mode
additional_routes: Optional list of custom routes

Returns:
A Starlette application with StreamableHTTP support
"""
# Create session manager using the provided event store
session_manager = StreamableHTTPSessionManager(
app=server._mcp_server,
event_store=event_store,
json_response=json_response,
stateless=stateless_http,
)

# Create the ASGI handler
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)

# Get auth middleware and routes
middleware, auth_routes, required_scopes = setup_auth_middleware_and_routes(
auth_server_provider, auth_settings
)

# Initialize routes with auth routes
routes: list[Route | Mount] = auth_routes.copy()

# Add StreamableHTTP routes with or without auth
if auth_server_provider:
# Auth is enabled, wrap endpoint with RequireAuthMiddleware
routes.append(
Mount(
streamable_http_path,
app=RequireAuthMiddleware(handle_streamable_http, required_scopes),
)
)
else:
# No auth required
routes.append(
Mount(
streamable_http_path,
app=handle_streamable_http,
)
)

# Add custom routes with lowest precedence
if additional_routes:
routes.extend(cast(list[Route | Mount], additional_routes))

# Create a lifespan manager to start and stop the session manager
@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
async with session_manager.run():
yield

# Create and return the Starlette app with middleware
return Starlette(debug=debug, routes=routes, middleware=middleware)
# Create and return the app with lifespan
return create_base_app(routes, middleware, debug, lifespan)
59 changes: 53 additions & 6 deletions src/fastmcp/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(
"is specified"
)
self._auth_server_provider = auth_server_provider

self._additional_http_routes: list[Route] = []
self.dependencies = self.settings.dependencies

Expand All @@ -167,30 +168,36 @@ def instructions(self) -> str | None:
return self._mcp_server.instructions

async def run_async(
self, transport: Literal["stdio", "sse"] | None = None, **transport_kwargs: Any
self,
transport: Literal["stdio", "sse", "streamable-http"] | None = None,
**transport_kwargs: Any,
) -> None:
"""Run the FastMCP server asynchronously.

Args:
transport: Transport protocol to use ("stdio" or "sse")
transport: Transport protocol to use ("stdio", "sse", or "streamable-http")
"""
if transport is None:
transport = "stdio"
if transport not in ["stdio", "sse"]:
if transport not in ["stdio", "sse", "streamable-http"]:
raise ValueError(f"Unknown transport: {transport}")

if transport == "stdio":
await self.run_stdio_async(**transport_kwargs)
else: # transport == "sse"
elif transport == "sse":
await self.run_sse_async(**transport_kwargs)
else: # transport == "streamable-http"
await self.run_streamable_http_async(**transport_kwargs)

def run(
self, transport: Literal["stdio", "sse"] | None = None, **transport_kwargs: Any
self,
transport: Literal["stdio", "sse", "streamable-http"] | None = None,
**transport_kwargs: Any,
) -> None:
"""Run the FastMCP server. Note this is a synchronous function.

Args:
transport: Transport protocol to use ("stdio" or "sse")
transport: Transport protocol to use ("stdio", "sse", or "streamable-http")
"""
logger.info(f'Starting server "{self.name}"...')

Expand Down Expand Up @@ -743,6 +750,46 @@ def sse_app(self) -> Starlette:
additional_routes=self._additional_http_routes,
)

def streamable_http_app(self) -> Starlette:
"""Return an instance of the StreamableHTTP server app."""
from fastmcp.server.http import create_streamable_http_app

return create_streamable_http_app(
server=self,
streamable_http_path=self.settings.streamable_http_path,
event_store=None,
auth_server_provider=self._auth_server_provider,
auth_settings=self.settings.auth,
json_response=self.settings.json_response,
stateless_http=self.settings.stateless_http,
debug=self.settings.debug,
additional_routes=self._additional_http_routes,
)

async def run_streamable_http_async(
self,
host: str | None = None,
port: int | None = None,
log_level: str | None = None,
uvicorn_config: dict | None = None,
) -> None:
"""Run the server using StreamableHTTP transport."""
uvicorn_config = uvicorn_config or {}
uvicorn_config.setdefault("timeout_graceful_shutdown", 0)

app = self.streamable_http_app()

config = uvicorn.Config(
app,
host=host or self.settings.host,
port=port or self.settings.port,
log_level=log_level or self.settings.log_level.lower(),
lifespan="on",
**uvicorn_config,
)
server = uvicorn.Server(config)
await server.serve()

def mount(
self,
prefix: str,
Expand Down
Loading