Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ dependencies = [
"python-dotenv>=1.1.0",
"exceptiongroup>=1.2.2",
"httpx>=0.28.1",
"mcp>=1.7.1,<2.0.0",
# "mcp>=1.7.1,<2.0.0",
# use git commit until 1.7.2 is released
"mcp",
"openapi-pydantic>=0.5.1",
"rich>=13.9.4",
"typer>=0.15.2",
Expand Down Expand Up @@ -62,6 +64,7 @@ Homepage = "https://gofastmcp.com"
Repository = "https://github.com/jlowin/fastmcp"
Documentation = "https://gofastmcp.com"


[build-system]
requires = ["hatchling", "uv-dynamic-versioning>=0.7.0"]
build-backend = "hatchling.build"
Expand All @@ -77,6 +80,9 @@ fallback-version = "0.0.0"


[tool.uv]

[tool.uv.sources]
mcp = { git = "https://github.com/modelcontextprotocol/python-sdk", rev = "a027d75f609000378522c5873c2a16aa1963d487" }
# uncomment to omit `dev` default group
# default-groups = []

Expand Down
157 changes: 140 additions & 17 deletions tests/server/test_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
from contextlib import asynccontextmanager

import anyio
import pytest
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.lowlevel.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from mcp.shared.message import SessionMessage
from mcp.types import (
ClientCapabilities,
Implementation,
Expand All @@ -14,9 +19,119 @@
)
from pydantic import TypeAdapter

from fastmcp import Context, FastMCP

@pytest.mark.anyio
async def test_lowlevel_server_lifespan():
"""Test that lifespan works in low-level server."""

@asynccontextmanager
async def test_lifespan(server: Server) -> AsyncIterator[dict[str, bool]]:
"""Test lifespan context that tracks startup/shutdown."""
context = {"started": False, "shutdown": False}
try:
context["started"] = True
yield context
finally:
context["shutdown"] = True

server = Server("test", lifespan=test_lifespan)

# Create memory streams for testing
send_stream1, receive_stream1 = anyio.create_memory_object_stream(100)
send_stream2, receive_stream2 = anyio.create_memory_object_stream(100)

# Create a tool that accesses lifespan context
@server.call_tool()
async def check_lifespan(name: str, arguments: dict) -> list:
ctx = server.request_context
assert isinstance(ctx.lifespan_context, dict)
assert ctx.lifespan_context["started"]
assert not ctx.lifespan_context["shutdown"]
return [{"type": "text", "text": "true"}]

# Run server in background task
async with (
anyio.create_task_group() as tg,
send_stream1,
receive_stream1,
send_stream2,
receive_stream2,
):

async def run_server():
await server.run(
receive_stream1,
send_stream2,
InitializationOptions(
server_name="test",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
raise_exceptions=True,
)

tg.start_soon(run_server)

# Initialize the server
params = InitializeRequestParams(
protocolVersion="2024-11-05",
capabilities=ClientCapabilities(),
clientInfo=Implementation(name="test-client", version="0.1.0"),
)
await send_stream1.send(
SessionMessage(
JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=TypeAdapter(InitializeRequestParams).dump_python(params),
)
)
)
)
response = await receive_stream2.receive()
response = response.message

# Send initialized notification
await send_stream1.send(
SessionMessage(
JSONRPCMessage(
root=JSONRPCNotification(
jsonrpc="2.0",
method="notifications/initialized",
)
)
)
)

# Call the tool to verify lifespan context
await send_stream1.send(
SessionMessage(
JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params={"name": "check_lifespan", "arguments": {}},
)
)
)
)

# Get response and verify
response = await receive_stream2.receive()
response = response.message
assert response.root.result["content"][0]["text"] == "true"

# Cancel server task
tg.cancel_scope.cancel()


@pytest.mark.anyio
async def test_fastmcp_server_lifespan():
"""Test that lifespan works in FastMCP server."""

Expand Down Expand Up @@ -71,41 +186,49 @@ async def run_server():
clientInfo=Implementation(name="test-client", version="0.1.0"),
)
await send_stream1.send(
JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=TypeAdapter(InitializeRequestParams).dump_python(params),
SessionMessage(
JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=TypeAdapter(InitializeRequestParams).dump_python(params),
)
)
)
)
response = await receive_stream2.receive()
response = response.message

# Send initialized notification
await send_stream1.send(
JSONRPCMessage(
root=JSONRPCNotification(
jsonrpc="2.0",
method="notifications/initialized",
SessionMessage(
JSONRPCMessage(
root=JSONRPCNotification(
jsonrpc="2.0",
method="notifications/initialized",
)
)
)
)

# Call the tool to verify lifespan context
await send_stream1.send(
JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params={"name": "check_lifespan", "arguments": {}},
SessionMessage(
JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params={"name": "check_lifespan", "arguments": {}},
)
)
)
)

# Get response and verify
response = await receive_stream2.receive()
response = response.message
assert response.root.result["content"][0]["text"] == "true"

# Cancel server task
Expand Down
Loading
Loading