-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Closed
Labels
Description
Initial Checks
- I confirm that I'm using the latest version of Pydantic AI
- I confirm that I searched for my issue in https://github.com/pydantic/pydantic-ai/issues before opening this issue
Description
I'm using pydantic-ai==0.2.6 (Because when I used version 0.2.11, importing the custom SSE version of the MCP server failed, which clearly indicates a bug, I had no choice but to temporarily use version 0.2.6.)
If I define mcp_servers=[server_amap, server_AgentMap] in the Agent, my code fails to run (the failure occurs when the function call action is executed). Conversely, if I comment out this MCP server definition and do not use any MCP servers, the code runs normally. My server_AgentMap is normal, The problem is not with it. Can you help me figure out what might be causing this issue?
Example Code Below
Example Code
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerHTTP, MCPServerStdio
from my_mcp.prompt import DEFAULT_SYSTEM_PROMPT_TEMPLATE_ZH_pydantic
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
import asyncio
server_amap = MCPServerStdio(
command=r"C:\Program Files\nodejs\npx.cmd",
args=["-y", "@amap/amap-maps-mcp-server"],
env={"AMAP_MAPS_API_KEY": "xxx"}
)
server_AgentMap = MCPServerHTTP(url="http://my-serverip:port/sse/")
my_model = OpenAIModel(
model_name='qwen-max',
provider=OpenAIProvider(
base_url='https://dashscope.aliyuncs.com/compatible-mode/v1',
api_key='sk-xxx'
)
)
agent = Agent(
model=my_model,
model_settings={'parallel_tool_calls': True, 'seed': 2025},
name='TravelAgent',
mcp_servers=[server_amap, server_AgentMap],
system_prompt=DEFAULT_SYSTEM_PROMPT_TEMPLATE_ZH_pydantic,
retries=3)
async def main():
async with agent.run_mcp_servers():
# async with agent.run_stream('帮我规划下去上海东方明珠开会的行程') as response:
# print(await response.get_output())
async with agent.run_stream('帮我规划下去上海东方明珠开会的行程') as result:
async for message in result.stream_text(delta=True):
print(message, end='')
if __name__ == '__main__':
asyncio.run(main())Python, Pydantic AI & LLM client version
Python 3.12.9
pydantic-ai 0.2.6