diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py index f14572439b..302a814e29 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py @@ -3,10 +3,58 @@ import re import threading import time +from typing import Any, Optional, Union from openai import AsyncStream, Stream +from openai._legacy_response import LegacyAPIResponse +from opentelemetry import context as context_api +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, + openai_attributes as OpenAIAttributes, +) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv_ai import SpanAttributes +from opentelemetry.trace import SpanKind, Span, StatusCode, Tracer +from typing_extensions import NotRequired from wrapt import ObjectProxy +from opentelemetry.instrumentation.openai.shared import ( + _extract_model_name_from_provider_format, + _set_request_attributes, + _set_span_attribute, + model_as_dict, +) +from opentelemetry.instrumentation.openai.utils import ( + _with_tracer_wrapper, + dont_throw, + should_send_prompts, +) + + +def _get_openai_sentinel_types() -> tuple: + """Dynamically discover OpenAI sentinel types available in this SDK version. + + OpenAI SDK uses sentinel objects (NOT_GIVEN, Omit) for unset optional parameters. + These types may not exist in older SDK versions, so we discover them at runtime. + """ + sentinel_types = [] + try: + from openai import NotGiven + sentinel_types.append(NotGiven) + except ImportError: + pass + try: + from openai import Omit + sentinel_types.append(Omit) + except ImportError: + pass + return tuple(sentinel_types) + + +# Tuple of OpenAI sentinel types for isinstance() checks (empty if none available) +_OPENAI_SENTINEL_TYPES: tuple = _get_openai_sentinel_types() + # Conditional imports for backward compatibility try: from openai.types.responses import ( @@ -24,7 +72,7 @@ RESPONSES_AVAILABLE = True except ImportError: # Fallback types for older OpenAI SDK versions - from typing import Any, Dict, List, Union + from typing import Dict, List # Create basic fallback types FunctionToolParam = Dict[str, Any] @@ -37,33 +85,25 @@ ResponseOutputMessageParam = Dict[str, Any] RESPONSES_AVAILABLE = False -from openai._legacy_response import LegacyAPIResponse -from opentelemetry import context as context_api -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.semconv._incubating.attributes import ( - gen_ai_attributes as GenAIAttributes, - openai_attributes as OpenAIAttributes, -) -from opentelemetry.semconv_ai import SpanAttributes -from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE -from opentelemetry.trace import SpanKind, Span, StatusCode, Tracer -from typing import Any, Optional, Union -from typing_extensions import NotRequired +SPAN_NAME = "openai.response" -from opentelemetry.instrumentation.openai.shared import ( - _extract_model_name_from_provider_format, - _set_request_attributes, - _set_span_attribute, - model_as_dict, -) -from opentelemetry.instrumentation.openai.utils import ( - _with_tracer_wrapper, - dont_throw, - should_send_prompts, -) +def _sanitize_sentinel_values(kwargs: dict) -> dict: + """Remove OpenAI sentinel values (NOT_GIVEN, Omit) from kwargs. -SPAN_NAME = "openai.response" + OpenAI SDK uses sentinel objects for unset optional parameters. + These don't have dict methods like .get(), causing errors when + code chains calls like kwargs.get("reasoning", {}).get("summary"). + + This removes sentinel values so the default (e.g., {}) is used instead + when calling .get() on the sanitized dict. + + If no sentinel types are available (older SDK), returns kwargs unchanged. + """ + if not _OPENAI_SENTINEL_TYPES: + return kwargs + return {k: v for k, v in kwargs.items() + if not isinstance(v, _OPENAI_SENTINEL_TYPES)} def prepare_input_param(input_param: ResponseInputItemParam) -> ResponseInputItemParam: @@ -453,6 +493,9 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa return wrapped(*args, **kwargs) start_time = time.time_ns() + # Remove OpenAI sentinel values (NOT_GIVEN, Omit) to allow chained .get() calls + non_sentinel_kwargs = _sanitize_sentinel_values(kwargs) + try: response = wrapped(*args, **kwargs) if isinstance(response, Stream): @@ -461,17 +504,17 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa kind=SpanKind.CLIENT, start_time=start_time, ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) return ResponseStream( span=span, response=response, start_time=start_time, - request_kwargs=kwargs, + request_kwargs=non_sentinel_kwargs, tracer=tracer, ) except Exception as e: - response_id = kwargs.get("response_id") + response_id = non_sentinel_kwargs.get("response_id") existing_data = {} if response_id and response_id in responses: existing_data = responses[response_id].model_dump() @@ -480,34 +523,34 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa start_time=existing_data.get("start_time", start_time), response_id=response_id or "", input=process_input( - kwargs.get("input", existing_data.get("input", [])) + non_sentinel_kwargs.get("input", existing_data.get("input", [])) ), - instructions=kwargs.get( + instructions=non_sentinel_kwargs.get( "instructions", existing_data.get("instructions") ), - tools=get_tools_from_kwargs(kwargs) or existing_data.get("tools", []), + tools=get_tools_from_kwargs(non_sentinel_kwargs) or existing_data.get("tools", []), output_blocks=existing_data.get("output_blocks", {}), usage=existing_data.get("usage"), - output_text=kwargs.get( + output_text=non_sentinel_kwargs.get( "output_text", existing_data.get("output_text", "") ), - request_model=kwargs.get( + request_model=non_sentinel_kwargs.get( "model", existing_data.get("request_model", "") ), response_model=existing_data.get("response_model", ""), # Reasoning attributes request_reasoning_summary=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "summary", existing_data.get("request_reasoning_summary") ) ), request_reasoning_effort=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "effort", existing_data.get("request_reasoning_effort") ) ), - response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"), - request_service_tier=kwargs.get("service_tier"), + response_reasoning_effort=non_sentinel_kwargs.get("reasoning", {}).get("effort"), + request_service_tier=non_sentinel_kwargs.get("service_tier"), response_service_tier=existing_data.get("response_service_tier"), ) except Exception: @@ -520,7 +563,7 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa start_time if traced_data is None else int(traced_data.start_time) ), ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) span.set_attribute(ERROR_TYPE, e.__class__.__name__) span.record_exception(e) span.set_status(StatusCode.ERROR, str(e)) @@ -536,7 +579,7 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa else: existing_data = existing_data.model_dump() - request_tools = get_tools_from_kwargs(kwargs) + request_tools = get_tools_from_kwargs(non_sentinel_kwargs) merged_tools = existing_data.get("tools", []) + request_tools @@ -552,28 +595,28 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa traced_data = TracedData( start_time=existing_data.get("start_time", start_time), response_id=parsed_response.id, - input=process_input(existing_data.get("input", kwargs.get("input"))), - instructions=existing_data.get("instructions", kwargs.get("instructions")), + input=process_input(existing_data.get("input", non_sentinel_kwargs.get("input"))), + instructions=existing_data.get("instructions", non_sentinel_kwargs.get("instructions")), tools=merged_tools if merged_tools else None, output_blocks={block.id: block for block in parsed_response.output} | existing_data.get("output_blocks", {}), usage=existing_data.get("usage", parsed_response.usage), output_text=existing_data.get("output_text", parsed_response_output_text), - request_model=existing_data.get("request_model", kwargs.get("model")), + request_model=existing_data.get("request_model", non_sentinel_kwargs.get("model")), response_model=existing_data.get("response_model", parsed_response.model), # Reasoning attributes request_reasoning_summary=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "summary", existing_data.get("request_reasoning_summary") ) ), request_reasoning_effort=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "effort", existing_data.get("request_reasoning_effort") ) ), - response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"), - request_service_tier=existing_data.get("request_service_tier", kwargs.get("service_tier")), + response_reasoning_effort=non_sentinel_kwargs.get("reasoning", {}).get("effort"), + request_service_tier=existing_data.get("request_service_tier", non_sentinel_kwargs.get("service_tier")), response_service_tier=existing_data.get("response_service_tier", parsed_response.service_tier), ) responses[parsed_response.id] = traced_data @@ -586,7 +629,7 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa kind=SpanKind.CLIENT, start_time=int(traced_data.start_time), ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) set_data_attributes(traced_data, span) span.end() @@ -602,6 +645,9 @@ async def async_responses_get_or_create_wrapper( return await wrapped(*args, **kwargs) start_time = time.time_ns() + # Remove OpenAI sentinel values (NOT_GIVEN, Omit) to allow chained .get() calls + non_sentinel_kwargs = _sanitize_sentinel_values(kwargs) + try: response = await wrapped(*args, **kwargs) if isinstance(response, (Stream, AsyncStream)): @@ -610,17 +656,17 @@ async def async_responses_get_or_create_wrapper( kind=SpanKind.CLIENT, start_time=start_time, ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) return ResponseStream( span=span, response=response, start_time=start_time, - request_kwargs=kwargs, + request_kwargs=non_sentinel_kwargs, tracer=tracer, ) except Exception as e: - response_id = kwargs.get("response_id") + response_id = non_sentinel_kwargs.get("response_id") existing_data = {} if response_id and response_id in responses: existing_data = responses[response_id].model_dump() @@ -629,30 +675,30 @@ async def async_responses_get_or_create_wrapper( start_time=existing_data.get("start_time", start_time), response_id=response_id or "", input=process_input( - kwargs.get("input", existing_data.get("input", [])) + non_sentinel_kwargs.get("input", existing_data.get("input", [])) ), - instructions=kwargs.get( + instructions=non_sentinel_kwargs.get( "instructions", existing_data.get("instructions", "") ), - tools=get_tools_from_kwargs(kwargs) or existing_data.get("tools", []), + tools=get_tools_from_kwargs(non_sentinel_kwargs) or existing_data.get("tools", []), output_blocks=existing_data.get("output_blocks", {}), usage=existing_data.get("usage"), - output_text=kwargs.get("output_text", existing_data.get("output_text")), - request_model=kwargs.get("model", existing_data.get("request_model")), + output_text=non_sentinel_kwargs.get("output_text", existing_data.get("output_text")), + request_model=non_sentinel_kwargs.get("model", existing_data.get("request_model")), response_model=existing_data.get("response_model"), # Reasoning attributes request_reasoning_summary=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "summary", existing_data.get("request_reasoning_summary") ) ), request_reasoning_effort=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "effort", existing_data.get("request_reasoning_effort") ) ), - response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"), - request_service_tier=kwargs.get("service_tier"), + response_reasoning_effort=non_sentinel_kwargs.get("reasoning", {}).get("effort"), + request_service_tier=non_sentinel_kwargs.get("service_tier"), response_service_tier=existing_data.get("response_service_tier"), ) except Exception: @@ -665,7 +711,7 @@ async def async_responses_get_or_create_wrapper( start_time if traced_data is None else int(traced_data.start_time) ), ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) span.set_attribute(ERROR_TYPE, e.__class__.__name__) span.record_exception(e) span.set_status(StatusCode.ERROR, str(e)) @@ -681,7 +727,7 @@ async def async_responses_get_or_create_wrapper( else: existing_data = existing_data.model_dump() - request_tools = get_tools_from_kwargs(kwargs) + request_tools = get_tools_from_kwargs(non_sentinel_kwargs) merged_tools = existing_data.get("tools", []) + request_tools @@ -698,28 +744,28 @@ async def async_responses_get_or_create_wrapper( traced_data = TracedData( start_time=existing_data.get("start_time", start_time), response_id=parsed_response.id, - input=process_input(existing_data.get("input", kwargs.get("input"))), - instructions=existing_data.get("instructions", kwargs.get("instructions")), + input=process_input(existing_data.get("input", non_sentinel_kwargs.get("input"))), + instructions=existing_data.get("instructions", non_sentinel_kwargs.get("instructions")), tools=merged_tools if merged_tools else None, output_blocks={block.id: block for block in parsed_response.output} | existing_data.get("output_blocks", {}), usage=existing_data.get("usage", parsed_response.usage), output_text=existing_data.get("output_text", parsed_response_output_text), - request_model=existing_data.get("request_model", kwargs.get("model")), + request_model=existing_data.get("request_model", non_sentinel_kwargs.get("model")), response_model=existing_data.get("response_model", parsed_response.model), # Reasoning attributes request_reasoning_summary=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "summary", existing_data.get("request_reasoning_summary") ) ), request_reasoning_effort=( - kwargs.get("reasoning", {}).get( + non_sentinel_kwargs.get("reasoning", {}).get( "effort", existing_data.get("request_reasoning_effort") ) ), - response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"), - request_service_tier=existing_data.get("request_service_tier", kwargs.get("service_tier")), + response_reasoning_effort=non_sentinel_kwargs.get("reasoning", {}).get("effort"), + request_service_tier=existing_data.get("request_service_tier", non_sentinel_kwargs.get("service_tier")), response_service_tier=existing_data.get("response_service_tier", parsed_response.service_tier), ) responses[parsed_response.id] = traced_data @@ -732,7 +778,7 @@ async def async_responses_get_or_create_wrapper( kind=SpanKind.CLIENT, start_time=int(traced_data.start_time), ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) set_data_attributes(traced_data, span) span.end() @@ -745,6 +791,8 @@ def responses_cancel_wrapper(tracer: Tracer, wrapped, instance, args, kwargs): if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return wrapped(*args, **kwargs) + non_sentinel_kwargs = _sanitize_sentinel_values(kwargs) + response = wrapped(*args, **kwargs) if isinstance(response, Stream): return response @@ -757,7 +805,7 @@ def responses_cancel_wrapper(tracer: Tracer, wrapped, instance, args, kwargs): start_time=existing_data.start_time, record_exception=True, ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) span.record_exception(Exception("Response cancelled")) set_data_attributes(existing_data, span) span.end() @@ -772,6 +820,8 @@ async def async_responses_cancel_wrapper( if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await wrapped(*args, **kwargs) + non_sentinel_kwargs = _sanitize_sentinel_values(kwargs) + response = await wrapped(*args, **kwargs) if isinstance(response, (Stream, AsyncStream)): return response @@ -784,7 +834,7 @@ async def async_responses_cancel_wrapper( start_time=existing_data.start_time, record_exception=True, ) - _set_request_attributes(span, prepare_kwargs_for_shared_attributes(kwargs), instance) + _set_request_attributes(span, prepare_kwargs_for_shared_attributes(non_sentinel_kwargs), instance) span.record_exception(Exception("Response cancelled")) set_data_attributes(existing_data, span) span.end() @@ -812,7 +862,8 @@ def __init__( super().__init__(response) self._span = span self._start_time = start_time - self._request_kwargs = request_kwargs or {} + # Filter sentinel values (defensive, in case called directly without prior filtering) + self._request_kwargs = _sanitize_sentinel_values(request_kwargs or {}) self._tracer = tracer self._traced_data = traced_data or TracedData( start_time=start_time, @@ -828,7 +879,9 @@ def __init__( request_reasoning_summary=self._request_kwargs.get("reasoning", {}).get( "summary" ), - request_reasoning_effort=self._request_kwargs.get("reasoning", {}).get("effort"), + request_reasoning_effort=self._request_kwargs.get("reasoning", {}).get( + "effort" + ), response_reasoning_effort=None, request_service_tier=self._request_kwargs.get("service_tier"), response_service_tier=None, diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses.py b/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses.py index 85a0dd5028..62df5e05f7 100644 --- a/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses.py +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses.py @@ -505,3 +505,92 @@ def test_response_stream_init_with_none_tools(): assert stream._traced_data is not None # Tools should be an empty list, not None assert stream._traced_data.tools == [] or stream._traced_data.tools is None + + +def test_response_stream_init_with_not_given_reasoning(): + """Test ResponseStream initialization when reasoning=NOT_GIVEN sentinel. + + This reproduces issue #3472 - OpenAI SDK uses NOT_GIVEN/Omit sentinels for + unset optional parameters. When code chains .get() calls like + kwargs.get("reasoning", {}).get(...), it fails because the sentinel exists + as the key value (not the default {}) but lacks a .get() method. + """ + from unittest.mock import MagicMock + + try: + from openai._types import NOT_GIVEN + except ImportError: + pytest.skip("NOT_GIVEN sentinel not available in this OpenAI SDK version") + + from opentelemetry.instrumentation.openai.v1.responses_wrappers import ( + ResponseStream, + ) + + mock_response = MagicMock() + mock_span = MagicMock() + mock_tracer = MagicMock() + + # Simulate kwargs where reasoning is set to NOT_GIVEN sentinel + # This is what happens when client.responses.create() is called without + # explicitly setting the reasoning parameter + request_kwargs_with_not_given = { + "model": "gpt-4", + "input": "test", + "reasoning": NOT_GIVEN, # This causes AttributeError: 'NotGiven' has no 'get' + "stream": True, + } + + # This should not raise AttributeError + stream = ResponseStream( + span=mock_span, + response=mock_response, + start_time=1234567890, + request_kwargs=request_kwargs_with_not_given, + tracer=mock_tracer, + ) + + assert stream is not None + assert stream._traced_data is not None + # Reasoning summary should be None when NOT_GIVEN sentinel is passed + assert stream._traced_data.request_reasoning_summary is None + + +def test_response_stream_init_with_omit_reasoning(): + """Test ResponseStream initialization when reasoning=Omit() instance. + + This is a variant of issue #3472 testing the Omit sentinel class. + """ + from unittest.mock import MagicMock + + try: + from openai._types import Omit + except ImportError: + pytest.skip("Omit sentinel not available in this OpenAI SDK version") + + from opentelemetry.instrumentation.openai.v1.responses_wrappers import ( + ResponseStream, + ) + + mock_response = MagicMock() + mock_span = MagicMock() + mock_tracer = MagicMock() + + request_kwargs_with_omit = { + "model": "gpt-4", + "input": "test", + "reasoning": Omit(), # Another sentinel type that lacks .get() + "stream": True, + } + + # This should not raise AttributeError + stream = ResponseStream( + span=mock_span, + response=mock_response, + start_time=1234567890, + request_kwargs=request_kwargs_with_omit, + tracer=mock_tracer, + ) + + assert stream is not None + assert stream._traced_data is not None + assert stream._traced_data.request_reasoning_summary is None