Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import json
import threading
import weakref
from typing import Collection
from wrapt import wrap_function_wrapper
from opentelemetry.trace import SpanKind, get_tracer, Tracer, set_span_in_context
Expand All @@ -23,14 +24,51 @@
)
from .utils import set_span_attribute, JSONEncoder
from agents import FunctionTool, WebSearchTool, FileSearchTool, ComputerTool
from agents.tracing.scope import Scope


_instruments = ("openai-agents >= 0.0.19",)

_root_span_storage = {}
_storage_lock = threading.RLock()
_instrumented_tools = set()


def _get_or_set_root_span_context(span=None):
"""Get root span context using scope-based trace_id approach.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once a root span is registered (via _get_or_set_root_span_context), subsequent calls with a new span parameter are ignored. Document this behavior clearly in the function's docstring.

Args:
span: Current span to potentially set as root span

Returns:
context: The appropriate context with root span set
"""
current_trace = Scope.get_current_trace()

if current_trace and current_trace.trace_id != "no-op":
trace_id = current_trace.trace_id

with _storage_lock:
weak_ref = _root_span_storage.get(trace_id)
root_span = weak_ref() if weak_ref else None

if root_span:
return set_span_in_context(root_span, context.get_current())
else:
ctx = context.get_current()
if span:
def cleanup_callback(ref):
with _storage_lock:
if _root_span_storage.get(trace_id) is ref:
del _root_span_storage[trace_id]

_root_span_storage[trace_id] = weakref.ref(span, cleanup_callback)
return set_span_in_context(span, ctx)
return ctx
else:
return context.get_current()

Comment thread
coderabbitai[bot] marked this conversation as resolved.

class OpenAIAgentsInstrumentor(BaseInstrumentor):
"""An instrumentor for OpenAI Agents SDK."""

Expand Down Expand Up @@ -118,14 +156,8 @@ async def _wrap_agent_run_streamed(
return await wrapped(*args, **kwargs)

agent_name = getattr(agent, "name", "agent")
thread_id = threading.get_ident()

root_span = _root_span_storage.get(thread_id)

if root_span:
ctx = set_span_in_context(root_span, context.get_current())
else:
ctx = context.get_current()
ctx = _get_or_set_root_span_context()

with tracer.start_as_current_span(
f"{agent_name}.agent",
Expand All @@ -136,8 +168,7 @@ async def _wrap_agent_run_streamed(
context=ctx,
) as span:
try:
if not root_span:
_root_span_storage[thread_id] = span
ctx = _get_or_set_root_span_context(span)

extract_agent_details(agent, span)
set_model_settings_span_attributes(agent, span)
Expand Down Expand Up @@ -217,13 +248,8 @@ async def _wrap_agent_run(
prompt_list = args[2] if len(args) > 2 else None
agent_name = getattr(agent, "name", "agent")
model_name = get_model_name(agent)
thread_id = threading.get_ident()
root_span = _root_span_storage.get(thread_id)

if root_span:
ctx = set_span_in_context(root_span, context.get_current())
else:
ctx = context.get_current()
ctx = _get_or_set_root_span_context()

with tracer.start_as_current_span(
f"{agent_name}.agent",
Expand All @@ -234,8 +260,7 @@ async def _wrap_agent_run(
context=ctx,
) as span:
try:
if not root_span:
_root_span_storage[thread_id] = span
ctx = _get_or_set_root_span_context(span)

extract_agent_details(agent, span)
set_model_settings_span_attributes(agent, span)
Expand Down Expand Up @@ -391,9 +416,6 @@ def extract_run_config_details(run_config, span):

def extract_tool_details(tracer: Tracer, tools):
"""Create spans for hosted tools and wrap FunctionTool execution."""
thread_id = threading.get_ident()
root_span = _root_span_storage.get(thread_id)

for tool in tools:
if isinstance(tool, FunctionTool):
tool_id = id(tool)
Expand All @@ -407,10 +429,7 @@ def extract_tool_details(tracer: Tracer, tools):
def create_wrapped_tool(original_tool, original_func):
async def wrapped_on_invoke_tool(tool_context, args_json):
tool_name = getattr(original_tool, "name", "tool")
if root_span:
ctx = set_span_in_context(root_span, context.get_current())
else:
ctx = context.get_current()
ctx = _get_or_set_root_span_context()

with tracer.start_as_current_span(
f"{tool_name}.tool",
Expand Down Expand Up @@ -452,10 +471,7 @@ async def wrapped_on_invoke_tool(tool_context, args_json):

elif isinstance(tool, (WebSearchTool, FileSearchTool, ComputerTool)):
tool_name = type(tool).__name__
if root_span:
ctx = set_span_in_context(root_span, context.get_current())
else:
ctx = context.get_current()
ctx = _get_or_set_root_span_context()

span = tracer.start_span(
f"{tool_name}.tool",
Expand Down
Loading