Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 132 additions & 39 deletions python/packages/core/agent_framework/_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast

from opentelemetry import propagate
from opentelemetry import trace as otel_trace

from ._tools import FunctionTool
from ._types import (
Expand All @@ -25,6 +26,11 @@
Message,
)
from .exceptions import ToolException, ToolExecutionException
from .observability import (
OtelAttr,
create_mcp_client_span,
set_mcp_span_error,
)

if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
Expand Down Expand Up @@ -283,6 +289,13 @@ def __init__(
def __str__(self) -> str:
return f"MCPTool(name={self.name}, description={self.description})"

def _mcp_base_span_attributes(self) -> dict[str, Any]:
"""Return base MCP span attributes shared across all operations.

Subclasses override to add transport-specific attributes (server address, port, etc.).
"""
return {}

def _parse_prompt_result_from_mcp(
self,
mcp_type: types.GetPromptResult,
Expand Down Expand Up @@ -793,8 +806,10 @@ async def _connect_on_owner(self, *, reset: bool = False, load_configured: bool
inner_exception=ex if isinstance(ex, Exception) else None,
) from ex
try:
initialize_result = await session.initialize()
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
with create_mcp_client_span("initialize", attributes=self._mcp_base_span_attributes()) as init_span:
initialize_result = await session.initialize()
init_span.set_attribute(OtelAttr.MCP_PROTOCOL_VERSION, initialize_result.protocolVersion)
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
Comment thread
TaoChenOSU marked this conversation as resolved.
except (Exception, asyncio.CancelledError) as ex:
if await self._close_and_check_cancelled(ex):
raise
Expand All @@ -812,8 +827,10 @@ async def _connect_on_owner(self, *, reset: bool = False, load_configured: bool
self.session = session
elif self.session._request_id == 0: # type: ignore[attr-defined]
# If the session is not initialized, we need to reinitialize it
initialize_result = await self.session.initialize()
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
with create_mcp_client_span("initialize", attributes=self._mcp_base_span_attributes()) as init_span:
initialize_result = await self.session.initialize()
init_span.set_attribute(OtelAttr.MCP_PROTOCOL_VERSION, initialize_result.protocolVersion)
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
Comment thread
TaoChenOSU marked this conversation as resolved.
elif self._server_capabilities is None:
self._set_server_capabilities(getattr(self.session, "_server_capabilities", None))
logger.debug("Connected to MCP server: %s", self.session)
Expand Down Expand Up @@ -1057,7 +1074,8 @@ async def _load_prompts_locked(self) -> None:
"Skipping MCP prompt loading because the server did not advertise prompts support."
)
return
prompt_list = await self.session.list_prompts(params=params) # type: ignore[union-attr]
with create_mcp_client_span("prompts/list", attributes=self._mcp_base_span_attributes()):
prompt_list = await self.session.list_prompts(params=params) # type: ignore[union-attr]
Comment thread
TaoChenOSU marked this conversation as resolved.
break
except ClosedResourceError as cl_ex:
if attempt == 0:
Expand Down Expand Up @@ -1142,7 +1160,8 @@ async def _load_tools_locked(self) -> None:
if not self._supports_tools:
logger.debug("Skipping MCP tool loading because the server did not advertise tools support.")
return
tool_list = await self.session.list_tools(params=params) # type: ignore[union-attr]
with create_mcp_client_span("tools/list", attributes=self._mcp_base_span_attributes()):
tool_list = await self.session.list_tools(params=params) # type: ignore[union-attr]
Comment thread
TaoChenOSU marked this conversation as resolved.
break
except ClosedResourceError as cl_ex:
if attempt == 0:
Expand Down Expand Up @@ -1314,9 +1333,6 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]:
ToolExecutionException: If the MCP server is not connected, tools are not loaded,
or the tool call fails.
"""
from anyio import ClosedResourceError
from mcp.shared.exceptions import McpError

if not self.load_tools_flag:
raise ToolExecutionException(
"Tools are not loaded for this server, please set load_tools=True in the constructor."
Expand Down Expand Up @@ -1365,7 +1381,28 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]:
meta = _inject_otel_into_mcp_meta(request_meta)

parser = self.parse_tool_results or self._parse_tool_result_from_mcp
# Try the operation, reconnecting once if the connection is closed

# Build MCP span attributes for tools/call
mcp_span_attrs = self._mcp_base_span_attributes()
mcp_span_attrs.update({
OtelAttr.TOOL_NAME: tool_name,
OtelAttr.OPERATION: OtelAttr.TOOL_EXECUTION_OPERATION,
})
with create_mcp_client_span("tools/call", target=tool_name, attributes=mcp_span_attrs) as span: # type: ignore
return await self._call_tool_with_retries(tool_name, filtered_kwargs, meta, parser, span)

async def _call_tool_with_retries(
self,
tool_name: str,
filtered_kwargs: dict[str, Any],
meta: dict[str, Any] | None,
parser: Callable[..., str | list[Content]],
span: otel_trace.Span,
) -> str | list[Content]:
"""Execute the MCP tools/call RPC with retry logic."""
from anyio import ClosedResourceError
from mcp.shared.exceptions import McpError

for attempt in range(2):
try:
result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=meta) # type: ignore
Expand All @@ -1376,6 +1413,9 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]:
if isinstance(parsed, list)
else str(parsed)
)
# Per OTel MCP semconv: set error.type="tool_error" for isError results
if span.is_recording():
set_mcp_span_error(span, "tool_error", text or str(parsed))
raise ToolExecutionException(text or str(parsed))
return parser(result)
except ToolExecutionException:
Expand All @@ -1387,6 +1427,8 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]:
is_connection_lost = isinstance(call_ex, ClosedResourceError) or is_session_terminated
if not is_connection_lost:
error_message = call_ex.error.message if isinstance(call_ex, McpError) else str(call_ex)
if span.is_recording():
set_mcp_span_error(span, type(call_ex).__name__, error_message)
raise ToolExecutionException(error_message, inner_exception=call_ex) from call_ex

if attempt == 0:
Expand All @@ -1403,11 +1445,15 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]:

# Second attempt also failed, give up.
logger.error("MCP connection closed unexpectedly after reconnection: %s", call_ex)
if span.is_recording():
set_mcp_span_error(span, type(call_ex).__name__, str(call_ex))
raise ToolExecutionException(
f"Failed to call tool '{tool_name}' - connection lost.",
inner_exception=call_ex,
) from call_ex
except Exception as ex:
if span.is_recording():
set_mcp_span_error(span, type(ex).__name__, str(ex))
raise ToolExecutionException(f"Failed to call tool '{tool_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to call tool '{tool_name}' after retries.")

Expand Down Expand Up @@ -1436,36 +1482,42 @@ async def get_prompt(self, prompt_name: str, **kwargs: Any) -> str:
)

parser = self.parse_prompt_results or self._parse_prompt_result_from_mcp
# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore
return parser(prompt_result)
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
mcp_span_attrs = self._mcp_base_span_attributes()
mcp_span_attrs.update({OtelAttr.PROMPT_NAME: prompt_name})

with create_mcp_client_span("prompts/get", target=prompt_name, attributes=mcp_span_attrs) as span:
for attempt in range(2):
try:
prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore
return parser(prompt_result)
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
set_mcp_span_error(span, type(cl_ex).__name__, str(cl_ex))
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
raise ToolExecutionException(
f"Failed to call prompt '{prompt_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
error_message = mcp_exc.error.message
raise ToolExecutionException(error_message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.")
f"Failed to call prompt '{prompt_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
error_message = mcp_exc.error.message
set_mcp_span_error(span, type(mcp_exc).__name__, error_message)
raise ToolExecutionException(error_message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
set_mcp_span_error(span, type(ex).__name__, str(ex))
raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.")

async def __aenter__(self) -> Self:
"""Enter the async context manager.
Expand Down Expand Up @@ -1621,6 +1673,11 @@ def __init__(
self.encoding = encoding
self._client_kwargs = kwargs

def _mcp_base_span_attributes(self) -> dict[str, Any]:
attrs = super()._mcp_base_span_attributes()
attrs[OtelAttr.NETWORK_TRANSPORT] = "pipe"
return attrs

def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
"""Get an MCP stdio client.

Expand Down Expand Up @@ -1761,6 +1818,24 @@ def __init__(
self._httpx_client: AsyncClient | None = http_client
self._header_provider = header_provider

def _mcp_base_span_attributes(self) -> dict[str, Any]:
attrs = super()._mcp_base_span_attributes()
attrs[OtelAttr.NETWORK_TRANSPORT] = "tcp"
attrs[OtelAttr.NETWORK_PROTOCOL_NAME] = "http"
try:
from httpx import URL

parsed = URL(self.url)
if parsed.host:
attrs[OtelAttr.ADDRESS] = parsed.host
port = parsed.port
if port is None:
port = 443 if parsed.scheme == "https" else 80
attrs[OtelAttr.PORT] = port
except Exception:
logger.debug("Failed to parse URL for MCP span transport attributes", exc_info=True)
return attrs

def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
"""Get an MCP streamable HTTP client.

Expand Down Expand Up @@ -1924,6 +1999,24 @@ def __init__(
self.url = url
self._client_kwargs = kwargs

def _mcp_base_span_attributes(self) -> dict[str, Any]:
attrs = super()._mcp_base_span_attributes()
attrs[OtelAttr.NETWORK_TRANSPORT] = "tcp"
attrs[OtelAttr.NETWORK_PROTOCOL_NAME] = "websocket"
try:
from urllib.parse import urlparse

parsed = urlparse(self.url)
if parsed.hostname:
attrs[OtelAttr.ADDRESS] = parsed.hostname
port = parsed.port
if port is None:
port = 443 if parsed.scheme == "wss" else 80
attrs[OtelAttr.PORT] = port
except Exception:
logger.debug("Failed to parse URL for MCP span transport attributes", exc_info=True)
return attrs

def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
"""Get an MCP WebSocket client.

Expand Down
66 changes: 65 additions & 1 deletion python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@
"EmbeddingTelemetryLayer",
"OtelAttr",
"configure_otel_providers",
"create_mcp_client_span",
"create_metric_views",
"create_resource",
"disable_instrumentation",
"enable_instrumentation",
"enable_sensitive_telemetry",
"get_meter",
"get_tracer",
"set_mcp_span_error",
]


Expand All @@ -110,7 +112,6 @@
"inner_accumulated_usage", default=None
)


OTEL_METRICS: Final[str] = "__otel_metrics__"
TOKEN_USAGE_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = (
1,
Expand Down Expand Up @@ -292,6 +293,14 @@ class OtelAttr(str, Enum):
AGENT_CREATE_OPERATION = "create_agent"
AGENT_INVOKE_OPERATION = "invoke_agent"

# MCP attributes (https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/)
MCP_METHOD_NAME = "mcp.method.name"
MCP_PROTOCOL_VERSION = "mcp.protocol.version"
MCP_SESSION_ID = "mcp.session.id"
PROMPT_NAME = "gen_ai.prompt.name"
NETWORK_TRANSPORT = "network.transport"
NETWORK_PROTOCOL_NAME = "network.protocol.name"

# Agent Framework specific attributes
MEASUREMENT_FUNCTION_TAG_NAME = "agent_framework.function.name"
MEASUREMENT_FUNCTION_INVOCATION_DURATION = "agent_framework.function.invocation.duration"
Expand Down Expand Up @@ -2013,6 +2022,61 @@ def get_function_span(
)


# region MCP span helpers


@contextlib.contextmanager
def create_mcp_client_span(
method_name: str,
target: str | None = None,
attributes: dict[str, Any] | None = None,
) -> Generator[trace.Span, Any, Any]:
"""Create an MCP client span per OTel MCP semantic conventions.

Span name follows the format ``{mcp.method.name} {target}`` when a target
is available, otherwise just ``{mcp.method.name}``.

See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#client

Args:
method_name: The MCP method name (e.g. ``initialize``, ``tools/call``).
target: Optional low-cardinality target (tool name, prompt name).
attributes: Additional span attributes.
"""
span_name = f"{method_name} {target}" if target else method_name
attrs: dict[str, Any] = {OtelAttr.MCP_METHOD_NAME: method_name}
if attributes:
attrs.update(attributes)
tracer = get_tracer() if OBSERVABILITY_SETTINGS.ENABLED else trace.NoOpTracer()
span = tracer.start_span(span_name, kind=trace.SpanKind.CLIENT, attributes=attrs)
with trace.use_span(
span=span,
end_on_exit=True,
record_exception=True,
set_status_on_exception=True,
) as current_span:
yield current_span


def set_mcp_span_error(
span: trace.Span,
error_type: str,
description: str | None = None,
) -> None:
"""Set error status and ``error.type`` on an MCP span.

Args:
span: The span to mark as errored.
error_type: The error type string (e.g. ``tool_error``, exception class name).
description: Optional description (e.g. JSON-RPC error message).
"""
span.set_attribute(OtelAttr.ERROR_TYPE, error_type)
span.set_status(trace.StatusCode.ERROR, description=description)


# endregion


@contextlib.contextmanager
def _activate_span(span: trace.Span) -> Generator[None]:
"""Attach ``span`` as the current span in the OpenTelemetry context.
Expand Down
Loading
Loading