Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
SpanAttributes,
)
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry import trace
from opentelemetry.trace.status import Status, StatusCode
from wrapt import ObjectProxy

Expand Down Expand Up @@ -86,75 +87,77 @@ def chat_wrapper(
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
)

run_async(_handle_request(span, kwargs, instance))
try:
start_time = time.time()
response = wrapped(*args, **kwargs)
end_time = time.time()
except Exception as e: # pylint: disable=broad-except
end_time = time.time()
duration = end_time - start_time if "start_time" in locals() else 0

attributes = {
"error.type": e.__class__.__name__,
}

if duration > 0 and duration_histogram:
duration_histogram.record(duration, attributes=attributes)
if exception_counter:
exception_counter.add(1, attributes=attributes)

span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
# Use the span as current context to ensure events get proper trace context
with trace.use_span(span, end_on_exit=False):
run_async(_handle_request(span, kwargs, instance))
try:
start_time = time.time()
response = wrapped(*args, **kwargs)
Comment on lines +90 to +95
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Context not propagated into new thread used by run_async; emit_event may still miss span_id

run_async executes the coroutine in a new thread when a loop is running. The current runtime context (and thus the current span set by trace.use_span) is not automatically propagated to that thread. emit_event inside _handle_request will then lack a current span.

Fix either by propagating the context into the thread here, or by updating utils.run_async to attach a captured context.

Minimal in-place fix:

-        run_async(_handle_request(span, kwargs, instance))
+        # Ensure current runtime context (with the span) flows into the helper thread
+        current_ctx = context_api.get_current()
+        def _runner():
+            token = context_api.attach(current_ctx)
+            try:
+                import asyncio
+                asyncio.run(_handle_request(span, kwargs, instance))
+            finally:
+                context_api.detach(token)
+        import threading
+        t = threading.Thread(target=_runner)
+        t.start()
+        t.join()

Alternatively, enhance utils.run_async to accept and attach a context (preferred, single place to maintain).

Committable suggestion skipped: line range outside the PR's diff.

end_time = time.time()
except Exception as e: # pylint: disable=broad-except
end_time = time.time()
duration = end_time - start_time if "start_time" in locals() else 0

attributes = {
"error.type": e.__class__.__name__,
}

raise
if duration > 0 and duration_histogram:
duration_histogram.record(duration, attributes=attributes)
if exception_counter:
exception_counter.add(1, attributes=attributes)

if is_streaming_response(response):
# span will be closed after the generator is done
if is_openai_v1():
return ChatStream(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)
else:
return _build_from_streaming_response(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()

duration = end_time - start_time
raise

_handle_response(
response,
span,
instance,
token_counter,
choice_counter,
duration_histogram,
duration,
)
if is_streaming_response(response):
# span will be closed after the generator is done
if is_openai_v1():
return ChatStream(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)
else:
return _build_from_streaming_response(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)

span.end()
duration = end_time - start_time

return response
_handle_response(
response,
span,
instance,
token_counter,
choice_counter,
duration_histogram,
duration,
)

span.end()

return response


@_with_chat_telemetry_wrapper
Expand Down Expand Up @@ -182,78 +185,80 @@ async def achat_wrapper(
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
)

await _handle_request(span, kwargs, instance)
# Use the span as current context to ensure events get proper trace context
with trace.use_span(span, end_on_exit=False):
await _handle_request(span, kwargs, instance)

try:
start_time = time.time()
response = await wrapped(*args, **kwargs)
end_time = time.time()
except Exception as e: # pylint: disable=broad-except
end_time = time.time()
duration = end_time - start_time if "start_time" in locals() else 0

common_attributes = Config.get_common_metrics_attributes()
attributes = {
**common_attributes,
"error.type": e.__class__.__name__,
}

if duration > 0 and duration_histogram:
duration_histogram.record(duration, attributes=attributes)
if exception_counter:
exception_counter.add(1, attributes=attributes)

span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
try:
start_time = time.time()
response = await wrapped(*args, **kwargs)
end_time = time.time()
except Exception as e: # pylint: disable=broad-except
end_time = time.time()
duration = end_time - start_time if "start_time" in locals() else 0

common_attributes = Config.get_common_metrics_attributes()
attributes = {
**common_attributes,
"error.type": e.__class__.__name__,
}

raise
if duration > 0 and duration_histogram:
duration_histogram.record(duration, attributes=attributes)
if exception_counter:
exception_counter.add(1, attributes=attributes)

if is_streaming_response(response):
# span will be closed after the generator is done
if is_openai_v1():
return ChatStream(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)
else:
return _abuild_from_streaming_response(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()

duration = end_time - start_time
raise

_handle_response(
response,
span,
instance,
token_counter,
choice_counter,
duration_histogram,
duration,
)
if is_streaming_response(response):
# span will be closed after the generator is done
if is_openai_v1():
return ChatStream(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)
else:
return _abuild_from_streaming_response(
span,
response,
instance,
token_counter,
choice_counter,
duration_histogram,
streaming_time_to_first_token,
streaming_time_to_generate,
start_time,
kwargs,
)

span.end()
duration = end_time - start_time

return response
_handle_response(
response,
span,
instance,
token_counter,
choice_counter,
duration_histogram,
duration,
)

span.end()

return response


@dont_throw
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.instrumentation.openai.shared import (
_set_client_attributes,
_set_functions_attributes,
Expand Down Expand Up @@ -55,25 +56,27 @@ def completion_wrapper(tracer, wrapped, instance, args, kwargs):
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
)

_handle_request(span, kwargs, instance)
# Use the span as current context to ensure events get proper trace context
with trace.use_span(span, end_on_exit=False):
_handle_request(span, kwargs, instance)

try:
response = wrapped(*args, **kwargs)
except Exception as e:
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
raise

if is_streaming_response(response):
# span will be closed after the generator is done
return _build_from_streaming_response(span, kwargs, response)
else:
_handle_response(response, span, instance)

Comment on lines +72 to 77
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Streaming path loses current-context for downstream events; wrap generator bodies with trace.use_span

When you return the streaming generator, the with trace.use_span context is exited. emit_event calls inside _build_from_streaming_response/_abuild_from_streaming_response will then run without a current span, reintroducing the missing span_id problem for streaming events.

Wrap the generator bodies with trace.use_span(span, end_on_exit=False) to keep the span current throughout iteration and finalization.

Apply:

 def _build_from_streaming_response(span, request_kwargs, response):
-    complete_response = {"choices": [], "model": "", "id": ""}
-    for item in response:
-        yield item
-        _accumulate_streaming_response(complete_response, item)
-    _set_response_attributes(span, complete_response)
-    _set_token_usage(span, request_kwargs, complete_response)
-    if should_emit_events():
-        _emit_streaming_response_events(complete_response)
-    else:
-        if should_send_prompts():
-            _set_completions(span, complete_response.get("choices"))
-    span.set_status(Status(StatusCode.OK))
-    span.end()
+    # Keep span as current during the whole streaming lifecycle
+    with trace.use_span(span, end_on_exit=False):
+        complete_response = {"choices": [], "model": "", "id": ""}
+        for item in response:
+            yield item
+            _accumulate_streaming_response(complete_response, item)
+        _set_response_attributes(span, complete_response)
+        _set_token_usage(span, request_kwargs, complete_response)
+        if should_emit_events():
+            _emit_streaming_response_events(complete_response)
+        else:
+            if should_send_prompts():
+                _set_completions(span, complete_response.get("choices"))
+        span.set_status(Status(StatusCode.OK))
+        span.end()
 
 async def _abuild_from_streaming_response(span, request_kwargs, response):
-    complete_response = {"choices": [], "model": "", "id": ""}
-    async for item in response:
-        yield item
-        _accumulate_streaming_response(complete_response, item)
-    _set_response_attributes(span, complete_response)
-    _set_token_usage(span, request_kwargs, complete_response)
-    if should_emit_events():
-        _emit_streaming_response_events(complete_response)
-    else:
-        if should_send_prompts():
-            _set_completions(span, complete_response.get("choices"))
-    span.set_status(Status(StatusCode.OK))
-    span.end()
+    # Keep span as current during the whole streaming lifecycle
+    with trace.use_span(span, end_on_exit=False):
+        complete_response = {"choices": [], "model": "", "id": ""}
+        async for item in response:
+            yield item
+            _accumulate_streaming_response(complete_response, item)
+        _set_response_attributes(span, complete_response)
+        _set_token_usage(span, request_kwargs, complete_response)
+        if should_emit_events():
+            _emit_streaming_response_events(complete_response)
+        else:
+            if should_send_prompts():
+                _set_completions(span, complete_response.get("choices"))
+        span.set_status(Status(StatusCode.OK))
+        span.end()

Run this quick grep to spot any remaining emit_event sites in streaming code that run outside a use_span context:

Also applies to: 108-115, 183-201, 204-222


🏁 Script executed:

#!/bin/bash
rg -n "emit_event\(" -A 4 -B 6 packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py

Length of output: 1359


I’d like to verify whether any trace.use_span wrappers already surround non-streaming paths or other emit sites. Let’s search for existing use_span usages in this file:


🏁 Script executed:

rg -n "use_span" -A 3 -B 3 packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py

Length of output: 582


Wrap streaming generators in trace.use_span to preserve current span context

Returning the raw generator exits the outer use_span (though the span remains open), so any emit_event calls inside the streaming builders run without a current span. Enclose the bodies of both streaming helpers in a nested with trace.use_span(span, end_on_exit=False): block so that events, status updates, and span.end() execute under the correct context.

Affected locations in
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py:

  • _build_from_streaming_response
  • _abuild_from_streaming_response

Suggested patch:

 def _build_from_streaming_response(span, request_kwargs, response):
-    complete_response = {"choices": [], "model": "", "id": ""}
-    for item in response:
-        yield item
-        _accumulate_streaming_response(complete_response, item)
-    _set_response_attributes(span, complete_response)
-    _set_token_usage(span, request_kwargs, complete_response)
-    if should_emit_events():
-        _emit_streaming_response_events(complete_response)
-    else:
-        if should_send_prompts():
-            _set_completions(span, complete_response.get("choices"))
-    span.set_status(Status(StatusCode.OK))
-    span.end()
+    # Keep span current for events and finalization
+    with trace.use_span(span, end_on_exit=False):
+        complete_response = {"choices": [], "model": "", "id": ""}
+        for item in response:
+            yield item
+            _accumulate_streaming_response(complete_response, item)
+        _set_response_attributes(span, complete_response)
+        _set_token_usage(span, request_kwargs, complete_response)
+        if should_emit_events():
+            _emit_streaming_response_events(complete_response)
+        elif should_send_prompts():
+            _set_completions(span, complete_response.get("choices"))
+        span.set_status(Status(StatusCode.OK))
+        span.end()
 
 async def _abuild_from_streaming_response(span, request_kwargs, response):
-    complete_response = {"choices": [], "model": "", "id": ""}
-    async for item in response:
-        yield item
-        _accumulate_streaming_response(complete_response, item)
-    _set_response_attributes(span, complete_response)
-    _set_token_usage(span, request_kwargs, complete_response)
-    if should_emit_events():
-        _emit_streaming_response_events(complete_response)
-    else:
-        if should_send_prompts():
-            _set_completions(span, complete_response.get("choices"))
-    span.set_status(Status(StatusCode.OK))
-    span.end()
+    # Keep span current for events and finalization
+    with trace.use_span(span, end_on_exit=False):
+        complete_response = {"choices": [], "model": "", "id": ""}
+        async for item in response:
+            yield item
+            _accumulate_streaming_response(complete_response, item)
+        _set_response_attributes(span, complete_response)
+        _set_token_usage(span, request_kwargs, complete_response)
+        if should_emit_events():
+            _emit_streaming_response_events(complete_response)
+        elif should_send_prompts():
+            _set_completions(span, complete_response.get("choices"))
+        span.set_status(Status(StatusCode.OK))
+        span.end()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if is_streaming_response(response):
# span will be closed after the generator is done
return _build_from_streaming_response(span, kwargs, response)
else:
_handle_response(response, span, instance)
def _build_from_streaming_response(span, request_kwargs, response):
# Keep span current for events and finalization
with trace.use_span(span, end_on_exit=False):
complete_response = {"choices": [], "model": "", "id": ""}
for item in response:
yield item
_accumulate_streaming_response(complete_response, item)
_set_response_attributes(span, complete_response)
_set_token_usage(span, request_kwargs, complete_response)
if should_emit_events():
_emit_streaming_response_events(complete_response)
elif should_send_prompts():
_set_completions(span, complete_response.get("choices"))
span.set_status(Status(StatusCode.OK))
span.end()
async def _abuild_from_streaming_response(span, request_kwargs, response):
# Keep span current for events and finalization
with trace.use_span(span, end_on_exit=False):
complete_response = {"choices": [], "model": "", "id": ""}
async for item in response:
yield item
_accumulate_streaming_response(complete_response, item)
_set_response_attributes(span, complete_response)
_set_token_usage(span, request_kwargs, complete_response)
if should_emit_events():
_emit_streaming_response_events(complete_response)
elif should_send_prompts():
_set_completions(span, complete_response.get("choices"))
span.set_status(Status(StatusCode.OK))
span.end()
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py
around lines 72 to 77, the streaming generator helpers
(_build_from_streaming_response and _abuild_from_streaming_response) return raw
generators which causes code executed inside them (emit_event, set_status,
span.end()) to run without the current span context; wrap the entire body of
each streaming helper in a nested with trace.use_span(span, end_on_exit=False):
block so the yielded generator executes with the span active, ensuring
emit_event/status/span.end() run under the correct context (apply the same
change to both sync and async variants).

try:
response = wrapped(*args, **kwargs)
except Exception as e:
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
raise

if is_streaming_response(response):
# span will be closed after the generator is done
return _build_from_streaming_response(span, kwargs, response)
else:
_handle_response(response, span, instance)

span.end()
return response
return response


@_with_tracer_wrapper
Expand All @@ -89,25 +92,27 @@ async def acompletion_wrapper(tracer, wrapped, instance, args, kwargs):
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
)

_handle_request(span, kwargs, instance)
# Use the span as current context to ensure events get proper trace context
with trace.use_span(span, end_on_exit=False):
_handle_request(span, kwargs, instance)

try:
response = await wrapped(*args, **kwargs)
except Exception as e:
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
raise

if is_streaming_response(response):
# span will be closed after the generator is done
return _abuild_from_streaming_response(span, kwargs, response)
else:
_handle_response(response, span, instance)

try:
response = await wrapped(*args, **kwargs)
except Exception as e:
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
raise

if is_streaming_response(response):
# span will be closed after the generator is done
return _abuild_from_streaming_response(span, kwargs, response)
else:
_handle_response(response, span, instance)

span.end()
return response
return response


@dont_throw
Expand Down
Loading