Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import json
import logging
import threading
import time
from functools import singledispatch
from typing import List, Optional, Union
Expand Down Expand Up @@ -269,7 +270,8 @@ async def _handle_request(span, kwargs, instance):
MessageEvent(
content=message.get("content"),
role=message.get("role"),
tool_calls=_parse_tool_calls(message.get("tool_calls", None)),
tool_calls=_parse_tool_calls(
message.get("tool_calls", None)),
)
)
else:
Expand All @@ -292,6 +294,7 @@ def _handle_response(
choice_counter=None,
duration_histogram=None,
duration=None,
is_streaming: bool = False,
):
if is_openai_v1():
response_dict = model_as_dict(response)
Expand All @@ -306,6 +309,7 @@ def _handle_response(
duration_histogram,
response_dict,
duration,
is_streaming,
)

# span attributes
Expand All @@ -323,13 +327,19 @@ def _handle_response(


def _set_chat_metrics(
instance, token_counter, choice_counter, duration_histogram, response_dict, duration
instance,
token_counter,
choice_counter,
duration_histogram,
response_dict,
duration,
is_streaming: bool = False,
):
shared_attributes = metric_shared_attributes(
response_model=response_dict.get("model") or None,
operation="chat",
server_address=_get_openai_base_url(instance),
is_streaming=False,
is_streaming=is_streaming,
)

# token metrics
Expand Down Expand Up @@ -420,7 +430,8 @@ async def _set_prompts(span, messages):
content = json.dumps(content)
_set_span_attribute(span, f"{prefix}.content", content)
if msg.get("tool_call_id"):
_set_span_attribute(span, f"{prefix}.tool_call_id", msg.get("tool_call_id"))
_set_span_attribute(
span, f"{prefix}.tool_call_id", msg.get("tool_call_id"))
tool_calls = msg.get("tool_calls")
if tool_calls:
for i, tool_call in enumerate(tool_calls):
Expand Down Expand Up @@ -476,9 +487,11 @@ def _set_completions(span, choices):
_set_span_attribute(span, f"{prefix}.role", message.get("role"))

if message.get("refusal"):
_set_span_attribute(span, f"{prefix}.refusal", message.get("refusal"))
_set_span_attribute(
span, f"{prefix}.refusal", message.get("refusal"))
else:
_set_span_attribute(span, f"{prefix}.content", message.get("content"))
_set_span_attribute(
span, f"{prefix}.content", message.get("content"))

function_call = message.get("function_call")
if function_call:
Expand Down Expand Up @@ -533,7 +546,8 @@ def _set_streaming_token_metrics(
# If API response doesn't have usage, fallback to tiktoken calculation
if prompt_usage == -1 or completion_usage == -1:
model_name = (
complete_response.get("model") or request_kwargs.get("model") or "gpt-4"
complete_response.get("model") or request_kwargs.get(
"model") or "gpt-4"
)

# Calculate prompt tokens if not available from API
Expand All @@ -543,7 +557,8 @@ def _set_streaming_token_metrics(
if msg.get("content"):
prompt_content += msg.get("content")
if model_name and should_record_stream_token_usage():
prompt_usage = get_token_count_from_string(prompt_content, model_name)
prompt_usage = get_token_count_from_string(
prompt_content, model_name)

# Calculate completion tokens if not available from API
if completion_usage == -1 and complete_response.get("choices"):
Expand All @@ -566,7 +581,8 @@ def _set_streaming_token_metrics(
**shared_attributes,
SpanAttributes.LLM_TOKEN_TYPE: "input",
}
token_counter.record(prompt_usage, attributes=attributes_with_token_type)
token_counter.record(
prompt_usage, attributes=attributes_with_token_type)

if isinstance(completion_usage, int) and completion_usage >= 0:
attributes_with_token_type = {
Expand Down Expand Up @@ -619,11 +635,34 @@ def __init__(
self._time_of_first_token = self._start_time
self._complete_response = {"choices": [], "model": ""}

# Cleanup state tracking to prevent duplicate operations
self._cleanup_completed = False
self._cleanup_lock = threading.Lock()

def __del__(self):
"""Cleanup when object is garbage collected"""
if hasattr(self, '_cleanup_completed') and not self._cleanup_completed:
self._ensure_cleanup()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Destructor Calls Risky Cleanup Methods

The ChatStream.__del__ method attempts complex cleanup by calling _ensure_cleanup(). Due to the unpredictable order of object destruction during garbage collection and interpreter shutdown, this can lead to AttributeError or other exceptions when accessing instance attributes (e.g., self._span, self._cleanup_lock, logger) that may have already been deallocated. This can result in incomplete resource cleanup or warnings, even with the @dont_throw decorator.

Locations (1)

Fix in Cursor Fix in Web


def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)
cleanup_exception = None
try:
self._ensure_cleanup()
except Exception as e:
cleanup_exception = e
# Don't re-raise to avoid masking original exception

result = self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)

if cleanup_exception:
# Log cleanup exception but don't affect context manager behavior
logger.debug(
"Error during ChatStream cleanup in __exit__: %s", cleanup_exception)

return result

async def __aenter__(self):
return self
Expand All @@ -643,6 +682,11 @@ def __next__(self):
except Exception as e:
if isinstance(e, StopIteration):
self._process_complete_response()
else:
# Handle cleanup for other exceptions during stream iteration
self._ensure_cleanup()
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Exception Handling Fails to Set Span Status

When an exception occurs during streaming iteration in ChatStream's __next__ and __anext__ methods, the span status is incorrectly set. The _ensure_cleanup() method is called first, which ends the span with an OK status. Subsequent attempts to set the span status to ERROR are ineffective because the span is already ended and cannot be modified, leading to an OK status despite the exception. The error status must be set before cleanup.

Locations (1)

Fix in Cursor Fix in Web

raise
else:
self._process_item(chunk)
Expand All @@ -654,13 +698,19 @@ async def __anext__(self):
except Exception as e:
if isinstance(e, StopAsyncIteration):
self._process_complete_response()
else:
# Handle cleanup for other exceptions during stream iteration
self._ensure_cleanup()
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
raise
else:
self._process_item(chunk)
return chunk

def _process_item(self, item):
self._span.add_event(name=f"{SpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}")
self._span.add_event(
name=f"{SpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}")

if self._first_token and self._streaming_time_to_first_token:
self._time_of_first_token = time.time()
Expand Down Expand Up @@ -721,10 +771,82 @@ def _process_complete_response(self):
emit_event(_parse_choice_event(choice))
else:
if should_send_prompts():
_set_completions(self._span, self._complete_response.get("choices"))
_set_completions(
self._span, self._complete_response.get("choices"))

self._span.set_status(Status(StatusCode.OK))
self._span.end()
self._cleanup_completed = True

@dont_throw
def _ensure_cleanup(self):
"""Thread-safe cleanup method that handles different cleanup scenarios"""
with self._cleanup_lock:
if self._cleanup_completed:
logger.debug("ChatStream cleanup already completed, skipping")
return

try:
logger.debug("Starting ChatStream cleanup")

# Calculate partial metrics based on available data
self._record_partial_metrics()

# Set span status and close it
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.OK))
self._span.end()
logger.debug("ChatStream span closed successfully")

self._cleanup_completed = True
logger.debug("ChatStream cleanup completed successfully")

except Exception as e:
# Log cleanup errors but don't propagate to avoid masking original issues
logger.debug("Error during ChatStream cleanup: %s", str(e))

# Still try to close the span even if metrics recording failed
try:
if self._span and self._span.is_recording():
self._span.set_status(
Status(StatusCode.ERROR, "Cleanup failed"))
self._span.end()
self._cleanup_completed = True
except Exception:
# Final fallback - just mark as completed to prevent infinite loops
self._cleanup_completed = True

@dont_throw
def _record_partial_metrics(self):
"""Record metrics based on available partial data"""
# Always record duration if we have start time
if self._start_time and isinstance(self._start_time, (float, int)) and self._duration_histogram:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration, attributes=self._shared_attributes()
)

# Record basic span attributes even without complete response
if self._span and self._span.is_recording():
_set_response_attributes(self._span, self._complete_response)

# Record partial token metrics if we have any data
if self._complete_response.get("choices") or self._request_kwargs:
_set_streaming_token_metrics(
self._request_kwargs,
self._complete_response,
self._span,
self._token_counter,
self._shared_attributes(),
)

# Record choice metrics if we have any choices processed
if self._choice_counter and self._complete_response.get("choices"):
_set_choice_counter_metrics(
self._choice_counter,
self._complete_response.get("choices"),
self._shared_attributes(),
)


# Backward compatibility with OpenAI v0
Expand Down Expand Up @@ -755,7 +877,8 @@ def _build_from_streaming_response(

if first_token and streaming_time_to_first_token:
time_of_first_token = time.time()
streaming_time_to_first_token.record(time_of_first_token - start_time)
streaming_time_to_first_token.record(
time_of_first_token - start_time)
first_token = False

_accumulate_stream_items(item, complete_response)
Expand Down Expand Up @@ -825,7 +948,8 @@ async def _abuild_from_streaming_response(

if first_token and streaming_time_to_first_token:
time_of_first_token = time.time()
streaming_time_to_first_token.record(time_of_first_token - start_time)
streaming_time_to_first_token.record(
time_of_first_token - start_time)
first_token = False

_accumulate_stream_items(item, complete_response)
Expand Down Expand Up @@ -943,7 +1067,8 @@ def _(choice: dict) -> ChoiceEvent:

content = choice.get("message").get("content", "") if has_message else None
role = choice.get("message").get("role") if has_message else "unknown"
finish_reason = choice.get("finish_reason") if has_finish_reason else "unknown"
finish_reason = choice.get(
"finish_reason") if has_finish_reason else "unknown"

if has_tool_calls and has_function_call:
tool_calls = message.get("tool_calls") + [message.get("function_call")]
Expand Down Expand Up @@ -982,7 +1107,8 @@ def _accumulate_stream_items(item, complete_response):

# prompt filter results
if item.get("prompt_filter_results"):
complete_response["prompt_filter_results"] = item.get("prompt_filter_results")
complete_response["prompt_filter_results"] = item.get(
"prompt_filter_results")

for choice in item.get("choices"):
index = choice.get("index")
Expand Down Expand Up @@ -1029,4 +1155,5 @@ def _accumulate_stream_items(item, complete_response):
if tool_call_function and tool_call_function.get("name"):
span_function["name"] = tool_call_function.get("name")
if tool_call_function and tool_call_function.get("arguments"):
span_function["arguments"] += tool_call_function.get("arguments")
span_function["arguments"] += tool_call_function.get(
"arguments")
Loading