Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.5
3.11
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,39 @@ def _extract_model_name_from_provider_format(model_name):
return model_name


def is_streaming_response(response):
def _is_legacy_api_response(obj):
"""Check if object is a LiteLLM LegacyAPIResponse"""
return hasattr(obj, 'parse') and callable(getattr(obj, 'parse'))


def is_streaming_response(response, kwargs=None):
if is_openai_v1():
return isinstance(response, openai.Stream) or isinstance(
response, openai.AsyncStream
)
# Check if it's directly a stream
if isinstance(response, openai.Stream) or isinstance(response, openai.AsyncStream):
return True

# For LegacyAPIResponse, check if the original request was streaming
# Note: LegacyAPIResponse handling is now done at wrapper level
if _is_legacy_api_response(response):
if kwargs and kwargs.get('stream'):
return True

return False

return isinstance(response, types.GeneratorType) or isinstance(
response, types.AsyncGeneratorType
)


def model_as_dict(model):
def model_as_dict(model, is_streaming=False):
if isinstance(model, dict):
return model

# For streaming LegacyAPIResponse, we can't extract completion data after streaming
# Just return empty dict to prevent crashes - completion data will come from cleanup
if _is_legacy_api_response(model) and is_streaming:
return {}

if _PYDANTIC_VERSION < "2.0.0":
return model.dict()
if hasattr(model, "model_dump"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from opentelemetry.instrumentation.openai.shared import (
OPENAI_LLM_USAGE_TOKEN_TYPES,
_get_openai_base_url,
_is_legacy_api_response,
_set_client_attributes,
_set_functions_attributes,
_set_request_attributes,
Expand Down Expand Up @@ -114,12 +115,24 @@ def chat_wrapper(

raise

if is_streaming_response(response):
if is_streaming_response(response, kwargs):
# span will be closed after the generator is done
if is_openai_v1():
return ChatStream(
# Handle LegacyAPIResponse by parsing it first, without mutating original
actual_response = response
parsed_successfully = False
if _is_legacy_api_response(response) and kwargs.get('stream'):
try:
actual_response = response.parse()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In chat_wrapper, the block handling LegacyAPIResponse parsing logs a warning on failure. Consider setting a flag (e.g. _response_was_parsed) to indicate successful parsing so that downstream cleanup can update span attributes accordingly.

parsed_successfully = True
except Exception as e:
logger.warning(f"Failed to parse LegacyAPIResponse: {e}")
# Fall back to original response
actual_response = response

stream = ChatStream(
span,
response,
actual_response,
instance,
token_counter,
choice_counter,
Expand All @@ -129,6 +142,9 @@ def chat_wrapper(
start_time,
kwargs,
)
if parsed_successfully:
stream._response_was_parsed = True
return stream
else:
return _build_from_streaming_response(
span,
Expand Down Expand Up @@ -215,12 +231,24 @@ async def achat_wrapper(

raise

if is_streaming_response(response):
if is_streaming_response(response, kwargs):
# span will be closed after the generator is done
if is_openai_v1():
return ChatStream(
# Handle LegacyAPIResponse by parsing it first, without mutating original
actual_response = response
parsed_successfully = False
if _is_legacy_api_response(response) and kwargs.get('stream'):
try:
actual_response = response.parse()
parsed_successfully = True
except Exception as e:
logger.warning(f"Failed to parse LegacyAPIResponse: {e}")
# Fall back to original response
actual_response = response

stream = ChatStream(
span,
response,
actual_response,
instance,
token_counter,
choice_counter,
Expand All @@ -230,6 +258,9 @@ async def achat_wrapper(
start_time,
kwargs,
)
if parsed_successfully:
stream._response_was_parsed = True
return stream
else:
return _abuild_from_streaming_response(
span,
Expand Down Expand Up @@ -298,7 +329,7 @@ def _handle_response(
is_streaming: bool = False,
):
if is_openai_v1():
response_dict = model_as_dict(response)
response_dict = model_as_dict(response, is_streaming)
else:
response_dict = response

Expand Down Expand Up @@ -609,9 +640,61 @@ def __init__(
self._cleanup_completed = False
self._cleanup_lock = threading.Lock()

def parse(self):
"""Handle LegacyAPIResponse.parse() calls from LiteLLM"""
if hasattr(self.__wrapped__, 'parse'):
# Parse the response to get the actual stream
parsed_stream = self.__wrapped__.parse()

# Create new ChatStream but inherit our current response accumulation
new_chat_stream = ChatStream(
self._span,
parsed_stream,
self._instance,
self._token_counter,
self._choice_counter,
self._duration_histogram,
self._streaming_time_to_first_token,
self._streaming_time_to_generate,
self._start_time,
self._request_kwargs,
)

# Transfer any accumulated response data to the new stream
new_chat_stream._complete_response = self._complete_response.copy()

# Mark this stream as no longer responsible for span completion
# since the new stream will handle it
self._cleanup_completed = True

return new_chat_stream
else:
return self

def close(self):
"""Close the stream and ensure cleanup"""
self._ensure_cleanup()
if hasattr(self.__wrapped__, 'close'):
return self.__wrapped__.close()

async def aclose(self):
"""Close the async stream and ensure cleanup"""
self._ensure_cleanup()
if hasattr(self.__wrapped__, 'close'):
return await self.__wrapped__.close()

Comment on lines +680 to +685
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix async close: don’t await sync close; prefer aclose when available.

Awaiting a sync close() will raise a TypeError. Check aclose first; call sync close() without await as a fallback.

-    async def aclose(self):
-        """Close the async stream and ensure cleanup"""
-        self._ensure_cleanup()
-        if hasattr(self.__wrapped__, 'close'):
-            return await self.__wrapped__.close()
+    async def aclose(self):
+        """Close the async stream and ensure cleanup"""
+        self._ensure_cleanup()
+        if hasattr(self.__wrapped__, "aclose"):
+            return await self.__wrapped__.aclose()
+        if hasattr(self.__wrapped__, "close"):
+            # Synchronous close as a fallback; do not await
+            return self.__wrapped__.close()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def aclose(self):
"""Close the async stream and ensure cleanup"""
self._ensure_cleanup()
if hasattr(self.__wrapped__, 'close'):
return await self.__wrapped__.close()
async def aclose(self):
"""Close the async stream and ensure cleanup"""
self._ensure_cleanup()
if hasattr(self.__wrapped__, "aclose"):
return await self.__wrapped__.aclose()
if hasattr(self.__wrapped__, "close"):
# Synchronous close as a fallback; do not await
return self.__wrapped__.close()
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py
around lines 670 to 675, the async aclose currently awaits a potentially
synchronous close() which can raise TypeError; change logic to prefer calling
and awaiting aclose() on the wrapped object if it exists, and only call close()
without await as a synchronous fallback. Implement: check for attribute 'aclose'
and await it when present, else if 'close' exists call it without await; keep
self._ensure_cleanup() as-is.

def __del__(self):
"""Cleanup when object is garbage collected"""
if hasattr(self, '_cleanup_completed') and not self._cleanup_completed:
# If we have accumulated completion data, make sure it gets set on the span
if (hasattr(self, '_complete_response') and
(self._complete_response.get('usage') or self._complete_response.get('choices')) and
self._span and self._span.is_recording()):
_set_response_attributes(self._span, self._complete_response)
if should_send_prompts():
_set_completions(self._span, self._complete_response.get("choices"))
self._span.set_status(Status(StatusCode.OK))

Comment on lines 686 to +697
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix multiline condition indentation in del and keep it flake8-compliant.

Current indentation triggers E129. Restructure the condition for clarity and lint compliance.

-        if hasattr(self, '_cleanup_completed') and not self._cleanup_completed:
-            # If we have accumulated completion data, make sure it gets set on the span
-            if (hasattr(self, '_complete_response') and 
-                (self._complete_response.get('usage') or self._complete_response.get('choices')) and 
-                self._span and self._span.is_recording()):
+        if hasattr(self, "_cleanup_completed") and not self._cleanup_completed:
+            # If we have accumulated completion data, make sure it gets set on the span
+            if (
+                hasattr(self, "_complete_response")
+                and (
+                    self._complete_response.get("usage")
+                    or self._complete_response.get("choices")
+                )
+                and self._span
+                and self._span.is_recording()
+            ):
                 _set_response_attributes(self._span, self._complete_response)
-                if should_send_prompts():
+                if should_send_prompts():
                     _set_completions(self._span, self._complete_response.get("choices"))
                 self._span.set_status(Status(StatusCode.OK))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __del__(self):
"""Cleanup when object is garbage collected"""
if hasattr(self, '_cleanup_completed') and not self._cleanup_completed:
# If we have accumulated completion data, make sure it gets set on the span
if (hasattr(self, '_complete_response') and
(self._complete_response.get('usage') or self._complete_response.get('choices')) and
self._span and self._span.is_recording()):
_set_response_attributes(self._span, self._complete_response)
if should_send_prompts():
_set_completions(self._span, self._complete_response.get("choices"))
self._span.set_status(Status(StatusCode.OK))
def __del__(self):
"""Cleanup when object is garbage collected"""
if hasattr(self, "_cleanup_completed") and not self._cleanup_completed:
# If we have accumulated completion data, make sure it gets set on the span
if (
hasattr(self, "_complete_response")
and (
self._complete_response.get("usage")
or self._complete_response.get("choices")
)
and self._span
and self._span.is_recording()
):
_set_response_attributes(self._span, self._complete_response)
if should_send_prompts():
_set_completions(self._span, self._complete_response.get("choices"))
self._span.set_status(Status(StatusCode.OK))
🧰 Tools
🪛 Flake8 (7.2.0)

[error] 682-682: visually indented line with same indent as next logical line

(E129)

self._ensure_cleanup()

def __enter__(self):
Expand All @@ -638,7 +721,23 @@ async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)
cleanup_exception = None
try:
self._ensure_cleanup()
except Exception as e:
cleanup_exception = e
# Don't re-raise to avoid masking original exception

result = False
if hasattr(self.__wrapped__, "__aexit__"):
result = await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)

if cleanup_exception:
# Log cleanup exception but don't affect context manager behavior
logger.debug(
"Error during ChatStream cleanup in __aexit__: %s", cleanup_exception)

return result

def __iter__(self):
return self
Expand Down Expand Up @@ -691,6 +790,7 @@ def _process_item(self, item):
self._first_token = False

_accumulate_stream_items(item, self._complete_response)


def _shared_attributes(self):
return metric_shared_attributes(
Expand Down Expand Up @@ -789,6 +889,8 @@ def _ensure_cleanup(self):
@dont_throw
def _record_partial_metrics(self):
"""Record metrics based on available partial data"""
# Debug logging
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove or expand the placeholder debug logging comment in _record_partial_metrics if not needed.

Suggested change
# Debug logging


# Always record duration if we have start time
if self._start_time and isinstance(self._start_time, (float, int)) and self._duration_histogram:
duration = time.time() - self._start_time
Expand All @@ -799,6 +901,16 @@ def _record_partial_metrics(self):
# Record basic span attributes even without complete response
if self._span and self._span.is_recording():
_set_response_attributes(self._span, self._complete_response)

# Also set completion attributes for any accumulated choices
if should_send_prompts():
_set_completions(self._span, self._complete_response.get("choices"))

# For LegacyAPIResponse that was successfully parsed, set status to OK
if getattr(self, '_response_was_parsed', False):
self._span.set_status(Status(StatusCode.OK))
# Set a basic finish reason since we know the response completed
self._span.set_attribute("gen_ai.response.finish_reason", "stop")

# Record partial token metrics if we have any data
if self._complete_response.get("choices") or self._request_kwargs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def completion_wrapper(tracer, wrapped, instance, args, kwargs):
span.end()
raise

if is_streaming_response(response):
if is_streaming_response(response, kwargs):
# span will be closed after the generator is done
return _build_from_streaming_response(span, kwargs, response)
else:
Expand Down Expand Up @@ -105,7 +105,7 @@ async def acompletion_wrapper(tracer, wrapped, instance, args, kwargs):
span.end()
raise

if is_streaming_response(response):
if is_streaming_response(response, kwargs):
# span will be closed after the generator is done
return _abuild_from_streaming_response(span, kwargs, response)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "Say hello"}], "model": "gpt-3.5-turbo",
"stream": true}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '98'
content-type:
- application/json
host:
- api.openai.com
traceparent:
- 00-29b1f3126ef228081d7c06cb664d7944-c833f72f872ef8ab-01
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.1
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
test_api_key. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 970a84031b9fc222-TLV
Connection:
- keep-alive
Content-Length:
- '262'
Content-Type:
- application/json; charset=utf-8
Date:
- Sun, 17 Aug 2025 16:23:23 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=CnO4N7IJLSVPZ1NJ1sXGrDdWIzyN41oYvTESuNDO7T0-1755447803-1.0.1.1-6VdBrSZbrcn1jl6eSOEfCU0vIIwWabzVpC2g9rFtnJ8T06pw_RH1ussZACigLi9axHA6Rn8ObEttzPb1mOFxmMyN8jcG978QILGVfIqS2_A;
path=/; expires=Sun, 17-Aug-25 16:53:23 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=OYHhC81cl5STQdGR0AnkjRK_.Pzo0gGLZWk7EwG9w88-1755447803697-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Comment on lines +64 to +68
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Redact Set-Cookie headers in error cassette as well.

Same concern as the streaming cassette: Set-Cookie values should be filtered.

-      Set-Cookie:
-      - __cf_bm=CnO4N7IJLSVPZ1NJ1sXGrDdWIzyN41oYvTESuNDO7T0-1755447803-1.0.1.1-6VdBrSZbrcn1jl6eSOEfCU0vIIwWabzVpC2g9rFtnJ8T06pw_RH1ussZACigLi9axHA6Rn8ObEttzPb1mOFxmMyN8jcG978QILGVfIqS2_A;
-        path=/; expires=Sun, 17-Aug-25 16:53:23 GMT; domain=.api.openai.com; HttpOnly;
-        Secure; SameSite=None
-      - _cfuvid=OYHhC81cl5STQdGR0AnkjRK_.Pzo0gGLZWk7EwG9w88-1755447803697-0.0.1.1-604800000;
-        path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
+      Set-Cookie:
+      - "<filtered>"
+      - "<filtered>"

If not already configured, add cassette filtering as shown in the other comment to avoid future leaks.

🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_completion_attributes.yaml
around lines 64-68, the cassette contains raw Set-Cookie header values that must
be redacted; update this cassette to remove or replace the Set-Cookie header
values (e.g., replace cookie strings with a fixed placeholder or remove the
header entirely) and apply the same cassette filtering configuration used for
the streaming cassette so future recordings are automatically sanitized; after
making the change, run the test suite to confirm no other cassettes leak cookie
values.

Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
vary:
- Origin
x-envoy-upstream-service-time:
- '1'
x-request-id:
- req_a314fe776829421484a993c64b674cee
status:
code: 401
message: Unauthorized
version: 1
Loading
Loading