Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions python/packages/core/agent_framework/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import base64
import contextlib
import json
import logging
import re
Expand Down Expand Up @@ -2890,6 +2891,7 @@ def __init__(
self._inner_stream_source: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]] | None = None
self._wrap_inner: bool = False
self._map_update: Callable[[Any], UpdateT | Awaitable[UpdateT]] | None = None
self._pull_context_manager_factories: list[Callable[[], contextlib.AbstractContextManager[Any]]] = []

def map(
self,
Expand Down Expand Up @@ -3008,11 +3010,18 @@ def __aiter__(self) -> ResponseStream[UpdateT, FinalT]:
return self

async def __anext__(self) -> UpdateT:
if self._iterator is None:
stream = await self._get_stream()
self._iterator = stream.__aiter__()
try:
update: UpdateT = await self._iterator.__anext__()
with contextlib.ExitStack() as stack:
for factory in self._pull_context_manager_factories:
stack.enter_context(factory())
# Resolve the underlying stream inside the pull contexts so that any
# spans/contexts created during stream resolution (e.g. inner chat
# completion spans created on the first pull of a wrapped agent stream)
# inherit the active context (e.g. an outer agent invoke span).
if self._iterator is None:
stream = await self._get_stream()
self._iterator = stream.__aiter__()
update: UpdateT = await self._iterator.__anext__()
except StopAsyncIteration:
self._consumed = True
await self._run_cleanup_hooks()
Expand Down Expand Up @@ -3177,6 +3186,25 @@ def with_cleanup_hook(
self._cleanup_hooks.append(hook)
return self

def with_pull_context_manager(
self,
cm_factory: Callable[[], contextlib.AbstractContextManager[Any]],
) -> ResponseStream[UpdateT, FinalT]:
"""Register a context manager factory invoked around each underlying iterator pull.

The factory is called once per ``__anext__`` and the returned context manager wraps
the await of the underlying iterator. This is useful for state that needs to be
active while the inner async work runs - for example, attaching an OpenTelemetry
span to the current context so child spans created by inner code (HTTP clients,
tool execution) are correctly parented.

Because the context manager is entered and exited within the same ``__anext__``
invocation, attach/detach style operations remain symmetric in the same async
context regardless of where the stream is iterated.
"""
self._pull_context_manager_factories.append(cm_factory)
Comment thread
TaoChenOSU marked this conversation as resolved.
return self

async def _run_cleanup_hooks(self) -> None:
if self._cleanup_run:
return
Expand Down
142 changes: 98 additions & 44 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, Final, Generic, Literal, TypedDict, cast, overload

from dotenv import load_dotenv
from opentelemetry import context as otel_context
from opentelemetry import metrics, trace

from . import __version__ as version_info
Expand Down Expand Up @@ -1277,27 +1278,8 @@ def get_response(
)

if stream:
result_stream = cast(
ResponseStream[ChatResponseUpdate, ChatResponse[Any]],
super_get_response(
messages=messages,
stream=True,
options=opts,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=merged_client_kwargs,
),
)
span = _start_streaming_span(attributes, OtelAttr.REQUEST_MODEL)

# Create span directly without trace.use_span() context attachment.
# Streaming spans are closed asynchronously in cleanup hooks, which run
# in a different async context than creation — using use_span() would
# cause "Failed to detach context" errors from OpenTelemetry.
operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(OtelAttr.REQUEST_MODEL, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
_capture_messages(
span=span,
Expand All @@ -1319,6 +1301,19 @@ def _close_span() -> None:
def _record_duration() -> None:
duration_state["duration"] = perf_counter() - start_time

result_stream = cast(
ResponseStream[ChatResponseUpdate, ChatResponse[Any]],
super_get_response(
messages=messages,
stream=True,
options=opts,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=merged_client_kwargs,
),
)
Comment thread
TaoChenOSU marked this conversation as resolved.
Outdated
Comment thread
TaoChenOSU marked this conversation as resolved.
Outdated

async def _finalize_stream() -> None:
from ._types import ChatResponse

Expand Down Expand Up @@ -1357,11 +1352,18 @@ async def _finalize_stream() -> None:
finally:
_close_span()

# Register a weak reference callback to close the span if stream is garbage collected
# without being consumed. This ensures spans don't leak if users don't consume streams.
wrapped_stream: ResponseStream[ChatResponseUpdate, ChatResponse[Any]] = result_stream.with_cleanup_hook(
_record_duration
).with_cleanup_hook(_finalize_stream)
# The pull context manager attaches the span around each underlying iterator pull so
# that child spans created during the pull (e.g. HTTP requests, inner tool execution)
# are parented under this chat span. Attach and detach happen in the same async
# context as the pull, avoiding cross-context cleanup issues. The weakref finalizer
# ensures the span is closed even if the stream is garbage collected without being
# consumed.
wrapped_stream: ResponseStream[ChatResponseUpdate, ChatResponse[Any]] = (
result_stream
.with_cleanup_hook(_record_duration)
.with_cleanup_hook(_finalize_stream)
.with_pull_context_manager(lambda: _activate_span(span))
)
weakref.finalize(wrapped_stream, _close_span)
return wrapped_stream

Expand Down Expand Up @@ -1543,23 +1545,8 @@ def _trace_agent_invocation(
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})

if stream:
try:
run_result: object = execute()
if isinstance(run_result, ResponseStream):
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = run_result # pyright: ignore[reportUnknownVariableType]
elif isinstance(run_result, Awaitable):
result_stream = ResponseStream.from_awaitable(run_result) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
else:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
except Exception:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
raise
span = _start_streaming_span(attributes, OtelAttr.AGENT_NAME)

operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(OtelAttr.AGENT_NAME, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
_capture_messages(
span=span,
Expand All @@ -1581,6 +1568,20 @@ def _close_span() -> None:
def _record_duration() -> None:
duration_state["duration"] = perf_counter() - start_time
Comment thread
moonbox3 marked this conversation as resolved.

try:
run_result: object = execute()
if isinstance(run_result, ResponseStream):
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = run_result # pyright: ignore[reportUnknownVariableType]
elif isinstance(run_result, Awaitable):
result_stream = ResponseStream.from_awaitable(run_result) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
else:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
except Exception:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
_close_span()
raise
Comment thread
TaoChenOSU marked this conversation as resolved.

async def _finalize_stream() -> None:
from ._types import AgentResponse

Expand Down Expand Up @@ -1620,9 +1621,18 @@ async def _finalize_stream() -> None:
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
_close_span()

wrapped_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = result_stream.with_cleanup_hook(
_record_duration
).with_cleanup_hook(_finalize_stream)
# The pull context manager attaches the span around each underlying iterator pull so
# that child spans created during the pull (e.g. inner chat completion spans from the
# underlying ChatTelemetryLayer) are parented under this agent invoke span. Attach and
# detach happen in the same async context as the pull, avoiding cross-context cleanup
# issues. The weakref finalizer ensures the span is closed even if the stream is
# garbage collected without being consumed.
wrapped_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = (
result_stream
.with_cleanup_hook(_record_duration)
.with_cleanup_hook(_finalize_stream)
.with_pull_context_manager(lambda: _activate_span(span))
)
weakref.finalize(wrapped_stream, _close_span)
return wrapped_stream

Expand Down Expand Up @@ -1809,6 +1819,27 @@ def get_function_span(
)


@contextlib.contextmanager
def _activate_span(span: trace.Span) -> Generator[None, None, None]:
"""Attach ``span`` as the current span in the OpenTelemetry context.

Designed to be used as a per-pull context manager registered on a
``ResponseStream`` via ``with_pull_context_manager``: it attaches the span
before each underlying iterator pull and detaches immediately after, so
child spans created during the pull (HTTP clients, inner chat completions,
tool execution) are correctly parented under ``span``.

Because attach and detach happen within the same ``__anext__`` invocation
(and therefore the same async task / contextvars context), there is no risk
of "Failed to detach context" warnings from cross-context cleanup.
"""
token = otel_context.attach(trace.set_span_in_context(span))
try:
yield
finally:
otel_context.detach(token)


@contextlib.contextmanager
def _get_span(
attributes: dict[str, Any],
Expand All @@ -1831,6 +1862,29 @@ def _get_span(
yield current_span


def _start_streaming_span(attributes: dict[str, Any], span_name_attribute: str) -> trace.Span:
"""Start a non-current span for a streaming operation.

Unlike :func:`_get_span`, the returned span is not attached to the current
OpenTelemetry context. The caller is responsible for:

- Ending the span via cleanup hooks on the wrapped
:class:`~agent_framework._types.ResponseStream`.
- Activating the span around each iterator pull via
:func:`_activate_span` registered with ``with_pull_context_manager`` so
that child spans created during stream production inherit it as parent.

Streaming spans are closed asynchronously in cleanup hooks that run in a
different async context than creation, so attaching the span at creation
time would cause "Failed to detach context" errors from OpenTelemetry.
"""
operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(span_name_attribute, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)
return span


def _get_instructions_from_options(options: Any) -> str | list[str] | None:
"""Extract instructions from options dict."""
if options is None:
Expand Down
Loading
Loading