Skip to content
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
_create_get_model_wrapper,
_create_get_all_tools_wrapper,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
_patch_error_tracing,
)
Expand All @@ -25,12 +26,16 @@ def _patch_runner() -> None:
# Create the root span for one full agent run (including eventual handoffs)
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run
)

# Creating the actual spans for each agent run.
# Patch streaming runner
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
)

# Creating the actual spans for each agent run (works for both streaming and non-streaming).
_patch_agent_run()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _create_get_model_wrapper # noqa: F401
from .tools import _create_get_all_tools_wrapper # noqa: F401
from .runner import _create_run_wrapper # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
140 changes: 110 additions & 30 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from functools import wraps

from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import reraise
from ..spans import (
Expand Down Expand Up @@ -31,22 +32,10 @@ def _patch_agent_run() -> None:

# Store original methods
original_run_single_turn = agents.run.AgentRunner._run_single_turn
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

def _start_invoke_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
kwargs: "dict[str, Any]",
) -> "Span":
"""Start an agent invocation span"""
# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, kwargs)
context_wrapper._sentry_agent_span = span

return span

def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
"""Check if there's an active agent span for this context"""
return getattr(context_wrapper, "_sentry_current_agent", None) is not None
Expand All @@ -57,6 +46,46 @@ def _get_current_agent(
"""Get the current agent from context wrapper"""
return getattr(context_wrapper, "_sentry_current_agent", None)

def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
"""Close the workflow span for streaming executions if it exists."""
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(*sys.exc_info())
delattr(agent, "_sentry_workflow_span")

def _maybe_start_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
should_run_agent_start_hooks: bool,
span_kwargs: "dict[str, Any]",
is_streaming: bool = False,
) -> "Optional[Span]":
"""
Start an agent invocation span if conditions are met.
Handles ending any existing span for a different agent.

Returns the new span if started, or the existing span if conditions aren't met.
"""
if not (should_run_agent_start_hooks and agent and context_wrapper):
return getattr(context_wrapper, "_sentry_agent_span", None)

# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)

# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
context_wrapper._sentry_agent_span = span
agent._sentry_agent_span = span

if is_streaming:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

return span

@wraps(
original_run_single_turn.__func__
if hasattr(original_run_single_turn, "__func__")
Expand All @@ -68,28 +97,18 @@ async def patched_run_single_turn(
"""Patched _run_single_turn that creates agent invocation spans"""
agent = kwargs.get("agent")
context_wrapper = kwargs.get("context_wrapper")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)

span = getattr(context_wrapper, "_sentry_agent_span", None)
# Start agent span when agent starts (but only once per agent)
if should_run_agent_start_hooks and agent and context_wrapper:
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)
span = _maybe_start_agent_span(
context_wrapper, agent, should_run_agent_start_hooks, kwargs
)

span = _start_invoke_agent_span(context_wrapper, agent, kwargs)
agent._sentry_agent_span = span

# Call original method with all the correct parameters
try:
result = await original_run_single_turn(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)

reraise(*sys.exc_info())

return result
Expand Down Expand Up @@ -117,7 +136,9 @@ async def patched_execute_handoffs(
# Call original method with all parameters
try:
result = await original_execute_handoffs(*args, **kwargs)

except Exception:
_close_streaming_workflow_span(agent)
raise
finally:
# End span for current agent after handoff processing is complete
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
Expand All @@ -139,18 +160,77 @@ async def patched_execute_final_output(
context_wrapper = kwargs.get("context_wrapper")
final_output = kwargs.get("final_output")

# Call original method with all parameters
try:
result = await original_execute_final_output(*args, **kwargs)
finally:
# End span for current agent after final output processing is complete
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
end_invoke_agent_span(context_wrapper, agent, final_output)
# For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
_close_streaming_workflow_span(agent)

return result

@wraps(
original_run_single_turn_streamed.__func__
if hasattr(original_run_single_turn_streamed, "__func__")
else original_run_single_turn_streamed
)
async def patched_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = bool(
args[5]
if len(args) > 5
else kwargs.get("should_run_agent_start_hooks", False)
)

span_kwargs: "dict[str, Any]" = {}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input

span = _maybe_start_agent_span(
context_wrapper,
agent,
should_run_agent_start_hooks,
span_kwargs,
is_streaming=True,
)

try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
_close_streaming_workflow_span(agent)
reraise(*sys.exc_info())

return result

# Apply patches
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
agents._run_impl.RunImpl.execute_final_output = classmethod(
patched_execute_final_output
Expand Down
79 changes: 65 additions & 14 deletions sentry_sdk/integrations/openai_agents/patches/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import sys
from functools import wraps

from sentry_sdk.integrations import DidNotEnable
Expand All @@ -9,15 +10,24 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Callable

from typing import Any, Callable, Optional

try:
import agents
except ImportError:
raise DidNotEnable("OpenAI Agents not installed")


def _set_response_model_on_agent_span(
agent: "agents.Agent", response_model: "Optional[str]"
) -> None:
"""Set the response model on the agent's invoke_agent span if available."""
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)


def _create_get_model_wrapper(
original_get_model: "Callable[..., Any]",
) -> "Callable[..., Any]":
Expand All @@ -37,15 +47,19 @@ def wrapped_get_model(
# because we only patch its direct methods, all underlying data can remain unchanged.
model = copy.copy(original_get_model(agent, run_config))

# Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model
# Capture the request model name for spans (agent.model can be None when using defaults)
request_model_name = model.model if hasattr(model, "model") else str(model)
agent._sentry_request_model = request_model_name

# Wrap _fetch_response if it exists (for OpenAI models) to capture response model
if hasattr(model, "_fetch_response"):
original_fetch_response = model._fetch_response

@wraps(original_fetch_response)
async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any":
response = await original_fetch_response(*args, **kwargs)
if hasattr(response, "model"):
agent._sentry_raw_response_model = str(response.model)
if hasattr(response, "model") and response.model:
agent._sentry_response_model = str(response.model)
return response

model._fetch_response = wrapped_fetch_response
Expand All @@ -57,22 +71,59 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any":
with ai_client_span(agent, kwargs) as span:
result = await original_get_response(*args, **kwargs)

response_model = getattr(agent, "_sentry_raw_response_model", None)
# Get response model captured from _fetch_response and clean up
response_model = getattr(agent, "_sentry_response_model", None)
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(
SPANDATA.GEN_AI_RESPONSE_MODEL, response_model
)
delattr(agent, "_sentry_response_model")

delattr(agent, "_sentry_raw_response_model")

update_ai_client_span(span, agent, kwargs, result, response_model)
_set_response_model_on_agent_span(agent, response_model)
update_ai_client_span(span, result, response_model)

return result

model.get_response = wrapped_get_response

# Also wrap stream_response for streaming support
if hasattr(model, "stream_response"):
original_stream_response = model.stream_response

@wraps(original_stream_response)
async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
# Uses explicit try/finally instead of context manager to ensure cleanup
# even if the consumer abandons the stream (GeneratorExit).
span_kwargs = dict(kwargs)
if len(args) > 0:
span_kwargs["system_instructions"] = args[0]
if len(args) > 1:
span_kwargs["input"] = args[1]

span = ai_client_span(agent, span_kwargs)
span.__enter__()
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

streaming_response = None
try:
async for event in original_stream_response(*args, **kwargs):
# Capture the full response from ResponseCompletedEvent
if hasattr(event, "response"):
streaming_response = event.response
yield event

# Update span with response data (usage, output, model)
if streaming_response:
response_model = (
str(streaming_response.model)
if hasattr(streaming_response, "model")
and streaming_response.model
else None
)
_set_response_model_on_agent_span(agent, response_model)
update_ai_client_span(span, streaming_response)
finally:
span.__exit__(*sys.exc_info())

model.stream_response = wrapped_stream_response

return model

return wrapped_get_model
Loading
Loading