Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
emit_message_events,
)
from opentelemetry.instrumentation.google_generativeai.span_utils import (
set_input_attributes,
set_input_attributes_sync,
set_model_request_attributes,
set_model_response_attributes,
set_response_attributes,
Expand Down Expand Up @@ -103,7 +103,7 @@ def _handle_request(span, args, kwargs, llm_model, event_logger):
if should_emit_events() and event_logger:
emit_message_events(args, kwargs, event_logger)
else:
set_input_attributes(span, args, kwargs, llm_model)
set_input_attributes_sync(span, args, kwargs, llm_model)

set_model_request_attributes(span, kwargs, llm_model)

Expand Down Expand Up @@ -249,10 +249,12 @@ def _wrap(
class GoogleGenerativeAiInstrumentor(BaseInstrumentor):
"""An instrumentor for Google Generative AI's client library."""

def __init__(self, exception_logger=None, use_legacy_attributes=True):
def __init__(self, exception_logger=None, use_legacy_attributes=True, upload_base64_image=None):
super().__init__()
Config.exception_logger = exception_logger
Config.use_legacy_attributes = use_legacy_attributes
if upload_base64_image:
Config.upload_base64_image = upload_base64_image

def instrumentation_dependencies(self) -> Collection[str]:
return ("google-genai >= 1.0.0",)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from typing import Callable


class Config:
exception_logger = None
use_legacy_attributes = True
upload_base64_image: Callable[[str, str, str, str], str] = (
lambda trace_id, span_id, image_name, base64_string: str

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda returns the type 'str' instead of a string value. This default no-op doesn't match expected behavior. Consider returning a valid string (or an async no-op function) instead of the type.

Suggested change
lambda trace_id, span_id, image_name, base64_string: str
lambda trace_id, span_id, image_name, base64_string: ""

)
Comment on lines +7 to +9

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix default upload_base64_image: current lambda returns the str type, not a URL string

The default should return a valid string (ideally a harmless placeholder URL) rather than the str class object. As-is, any call site that uses the default will get <class 'str'>, which breaks downstream JSON formatting for image_url.url and can crash or silently misbehave.

Apply this diff:

-    upload_base64_image: Callable[[str, str, str, str], str] = (
-        lambda trace_id, span_id, image_name, base64_string: str
-    )
+    # Default: do not upload, but return a harmless placeholder URL.
+    upload_base64_image: Callable[[str, str, str, str], str] = (
+        lambda trace_id, span_id, image_name, base64_string: "about:blank"
+    )

Optionally, consider letting this be Optional[Callable] with None default and handling the fallback in span_utils for more explicit behavior. I can draft that change across both google_generativeai and vertexai configs to keep them consistent.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
upload_base64_image: Callable[[str, str, str, str], str] = (
lambda trace_id, span_id, image_name, base64_string: str
)
# Default: do not upload, but return a harmless placeholder URL.
upload_base64_image: Callable[[str, str, str, str], str] = (
lambda trace_id, span_id, image_name, base64_string: "about:blank"
)
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/config.py
around lines 7-9, the default upload_base64_image lambda currently returns the
str class object rather than a URL string; replace the default implementation so
it returns a harmless placeholder URL string (e.g.
"https://example.com/placeholder.png") instead of str; alternatively, if you
prefer the Optional approach, change the type to Optional[Callable[[str, str,
str, str], str]] = None and handle the fallback where upload_base64_image is
invoked (e.g., in span_utils) to return a placeholder URL when None.

Original file line number Diff line number Diff line change
@@ -1,22 +1,113 @@
import json
import base64
import logging
import asyncio
from opentelemetry.instrumentation.google_generativeai.utils import (
dont_throw,
should_send_prompts,
)
from opentelemetry.instrumentation.google_generativeai.config import Config
from opentelemetry.semconv_ai import (
SpanAttributes,
)
from opentelemetry.trace.status import Status, StatusCode


logger = logging.getLogger(__name__)


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


def _is_image_part(item):
"""Check if item is a Google GenAI Part object containing image data"""
try:
# Check if it has the Part attributes we expect for new Google GenAI SDK
if hasattr(item, 'inline_data') and item.inline_data is not None:
# Check if it's an image mime type and has data
if (hasattr(item.inline_data, 'mime_type') and
item.inline_data.mime_type and
'image/' in item.inline_data.mime_type and
hasattr(item.inline_data, 'data') and
item.inline_data.data):
return True
return False
except Exception:
return False
Comment thread
nirga marked this conversation as resolved.


async def _process_image_part(item, trace_id, span_id, content_index):
"""Process a Google GenAI Part object containing image data"""
if not Config.upload_base64_image:
return item

try:
# Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg')
image_format = item.inline_data.mime_type.split('/')[1] if item.inline_data.mime_type else 'unknown'
image_name = f"content_{content_index}.{image_format}"

# Convert binary data to base64 string for upload
binary_data = item.inline_data.data
base64_string = base64.b64encode(binary_data).decode('utf-8')

# Upload the base64 data
url = await Config.upload_base64_image(trace_id, span_id, image_name, base64_string)

# Return OpenAI-compatible format for consistency across LLM providers
return {
"type": "image_url",
"image_url": {"url": url}
}
except Exception as e:
logger.warning(f"Failed to process image part: {e}")
# Return fallback in OpenAI-compatible format
return {
"type": "image_url",
"image_url": {"url": "/fallback/async_image"}
Comment thread
nirga marked this conversation as resolved.
Outdated
}

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment on lines +72 to +85

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

run_async swallows exceptions and does not return results; fix and use the return value

In the sync path, exceptions raised inside the coroutine terminate the thread and are lost; url remains None, and the caller returns an invalid image_url. Make run_async return the coroutine result and propagate exceptions. Then simplify the sync uploader to use the returned value rather than a nonlocal side channel.

Apply this diff:

@@
-def run_async(method):
-    """Handle async method in sync context, following OpenAI's battle-tested approach"""
+def run_async(coro):
+    """Run a coroutine from sync code; return its result and propagate exceptions."""
     try:
         loop = asyncio.get_running_loop()
     except RuntimeError:
         loop = None
 
-    if loop and loop.is_running():
-        thread = threading.Thread(target=lambda: asyncio.run(method))
-        thread.start()
-        thread.join()
-    else:
-        asyncio.run(method)
+    if loop and loop.is_running():
+        result = {"value": None, "exc": None}
+
+        def runner():
+            try:
+                result["value"] = asyncio.run(coro)
+            except Exception as e:
+                result["exc"] = e
+
+        thread = threading.Thread(target=runner, daemon=True)
+        thread.start()
+        thread.join()
+        if result["exc"]:
+            raise result["exc"]
+        return result["value"]
+    else:
+        return asyncio.run(coro)
@@
-        # Use OpenAI's run_async pattern to handle the async upload function
-        url = None
-
-        async def upload_task():
-            nonlocal url
-            url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string)
-
-        run_async(upload_task())
+        # Use run_async to bridge the async upload function in a sync context
+        async def upload_task():
+            return await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string)
+        url = run_async(upload_task())

Also applies to: 101-109


def _process_image_part_sync(item, trace_id, span_id, content_index):
"""Synchronous version of image part processing"""
if not Config.upload_base64_image:
return item

try:
# Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg')
image_format = item.inline_data.mime_type.split('/')[1] if item.inline_data.mime_type else 'unknown'
image_name = f"content_{content_index}.{image_format}"

# Convert binary data to base64 string for upload
binary_data = item.inline_data.data
base64_string = base64.b64encode(binary_data).decode('utf-8')

# Use asyncio.run to call the async upload function in sync context
try:
url = asyncio.run(Config.upload_base64_image(trace_id, span_id, image_name, base64_string))
except Exception as upload_error:
logger.warning(f"Failed to upload image: {upload_error}")
url = f"/image/{image_name}" # Fallback URL

return {
"type": "image_url",
"image_url": {"url": url}
}
except Exception as e:
logger.warning(f"Failed to process image part sync: {e}")
# Return fallback in OpenAI-compatible format
return {
"type": "image_url",
"image_url": {"url": "/fallback/sync_image"}
}


@dont_throw
def set_input_attributes(span, args, kwargs, llm_model):
async def set_input_attributes(span, args, kwargs, llm_model):
"""Process input arguments, handling both text and image content"""
Comment on lines 204 to +206

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

dont_throw on an async function won’t catch in-coroutine exceptions

@dont_throw as implemented in utils.py is synchronous and won’t catch exceptions raised inside this async def set_input_attributes(...). This silently bypasses the intended shielding/logging.

Two viable fixes (A preferred):

  • A) Update the decorator in utils.py to handle coroutine functions.
  • B) Remove the decorator here and add an internal try/except.

A) Example (utils.py):

import functools, inspect, logging, traceback

def dont_throw(func):
    logger = logging.getLogger(func.__module__)

    if inspect.iscoroutinefunction(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                logger.debug(
                    "OpenLLMetry failed to trace in %s, error: %s",
                    func.__name__,
                    traceback.format_exc(),
                )
                if Config.exception_logger:
                    Config.exception_logger(e)
        return async_wrapper

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.debug(
                "OpenLLMetry failed to trace in %s, error: %s",
                func.__name__,
                traceback.format_exc(),
            )
            if Config.exception_logger:
                Config.exception_logger(e)
    return wrapper

Would you like me to open a follow-up PR to implement (A) and sweep for other async uses of @dont_throw?

if not span.is_recording():
Comment on lines 204 to 207

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

dont_throw wraps an async function incorrectly — exceptions inside the coroutine won’t be caught

The current dont_throw decorator is synchronous and won’t catch exceptions raised inside async def set_input_attributes(...). This silently disables the intended error shielding/logging for this async path.

Two viable fixes (prefer A for reuse):

  • A) Make the decorator support coroutine functions globally (utils.py). Keep the decorator here unchanged.
  • B) Remove the decorator from the async function and handle try/except inside the function body.

A) Update decorator (utils.py), then keep using @dont_throw here:

# utils.py
import functools, inspect, logging, traceback

def dont_throw(func):
    logger = logging.getLogger(func.__module__)

    if inspect.iscoroutinefunction(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                logger.debug(
                    "OpenLLMetry failed to trace in %s, error: %s",
                    func.__name__,
                    traceback.format_exc(),
                )
                if Config.exception_logger:
                    Config.exception_logger(e)
        return async_wrapper

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.debug(
                "OpenLLMetry failed to trace in %s, error: %s",
                func.__name__,
                traceback.format_exc(),
            )
            if Config.exception_logger:
                Config.exception_logger(e)
    return wrapper

B) Alternatively, drop the decorator here and wrap the function body with try/except.

🤖 Prompt for AI Agents
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py
around lines 98-101: the dont_throw decorator is synchronous so exceptions
raised inside this async function won’t be caught; fix by updating dont_throw in
utils.py to detect coroutine functions (inspect.iscoroutinefunction) and return
an async wrapper that awaits the function and catches/logs exceptions
(preferred, then keep @dont_throw here), or as an alternative remove the
decorator here and add an internal try/except in set_input_attributes to
catch/log exceptions.

Comment on lines +205 to 207

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Signature change to async: audit call sites to ensure proper awaiting or use of the sync wrapper

set_input_attributes is now async. Ensure instrumentors call await set_input_attributes(...) or use set_input_attributes_sync(...) in synchronous code paths.

Run:

Expected: All real invocations in async contexts are awaited; sync contexts use set_input_attributes_sync.

Also applies to: 212-216


🏁 Script executed:

#!/bin/bash
# Find all call sites
rg -nP --type=py -C2 '\bset_input_attributes\s*\('

echo
echo "Calls that are awaited:"
rg -nP --type=py -C2 'await\s+set_input_attributes\s*\('

echo
echo "Suspicious un-awaited calls (may be false positives if they call the _sync version):"
rg -nP --type=py -C2 '(?<!await\s)\bset_input_attributes\s*\(' | rg -v '_sync\s*\('

Length of output: 17620


🏁 Script executed:

#!/bin/bash
# Display the head of the async span_utils implementation
sed -n '1,200p' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py

echo
echo "— Sync wrapper definitions in span_utils.py —"
rg -nP --type=py '_sync' -n packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py

echo
echo "— Calls to async set_input_attributes in __init__.py —"
rg -nP --type=py -C2 '\bset_input_attributes\s*\(' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py

echo
echo "— Calls to sync set_input_attributes_sync in __init__.py —"
rg -nP --type=py -C2 '\bset_input_attributes_sync\s*\(' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py

Length of output: 9700


🏁 Script executed:

#!/bin/bash
# Show the first 200 lines of the google_generativeai instrumentation __init__.py for context
sed -n '1,200p' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py

Length of output: 5948


Fix async wrapper to use the async set_input_attributes instead of the sync shim

The async instrumentation in
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py
still calls the synchronous helper (_handle_requestset_input_attributes_sync), which invokes asyncio.run(...) inside an already-running event loop. In a live async context this will raise RuntimeError: asyncio.run() cannot be called from a running event loop, breaking prompt-attribute recording.

• Location: async wrapper _awrap (around line 155)
• Current code:

@_with_tracer_wrapper
async def _awrap(...):
    ...
    _handle_request(span, args, kwargs, llm_model, event_logger)
    response = await wrapped(*args, **kwargs)
    ...

• Proposed change: call the async setter directly (and preserve the dont_throw behavior):

-    _handle_request(span, args, kwargs, llm_model, event_logger)
+    # use async setter in async contexts
+    try:
+        if should_emit_events() and event_logger:
+            emit_message_events(args, kwargs, event_logger)
+        else:
+            await set_input_attributes(span, args, kwargs, llm_model)
+    except Exception:
+        # swallow per @dont_throw contract
+        pass
     response = await wrapped(*args, **kwargs)

• Remove or deprecate _handle_request for async use, or split into separate sync/async handlers.
• Verify the sync wrapper (_handle_requestset_input_attributes_sync) remains used in the sync _wrap.

This change ensures that in async flows we properly await set_input_attributes, and only use the sync shim in purely synchronous instrumentation.

return

Expand All @@ -26,53 +117,211 @@ def set_input_attributes(span, args, kwargs, llm_model):
if "contents" in kwargs:
contents = kwargs["contents"]
if isinstance(contents, str):
# Simple string content in OpenAI format
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
contents,
json.dumps([{"type": "text", "text": contents}]),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.role",
"user",
)
elif isinstance(contents, list):
# Process content list - could be mixed text and Part objects
for i, content in enumerate(contents):
processed_content = []

if hasattr(content, "parts"):
for part in content.parts:
if hasattr(part, "text"):
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
part.text,
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
getattr(content, "role", "user"),
# Content with parts (Google GenAI Content object)
Comment thread
nirga marked this conversation as resolved.
Outdated
for j, part in enumerate(content.parts):
if hasattr(part, "text") and part.text:
processed_content.append({"type": "text", "text": part.text})
elif _is_image_part(part):
processed_image = await _process_image_part(
part, span.context.trace_id, span.context.span_id, j
)
processed_content.append(processed_image)
else:
# Other part types
processed_content.append({"type": "text", "text": str(part)})
elif isinstance(content, str):
# Direct string in the list
processed_content.append({"type": "text", "text": content})
elif _is_image_part(content):
# Direct Part object that's an image
processed_image = await _process_image_part(
content, span.context.trace_id, span.context.span_id, 0
)
processed_content.append(processed_image)
else:
# Other content types
processed_content.append({"type": "text", "text": str(content)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
getattr(content, "role", "user"),
)
elif args and len(args) > 0:
prompt = ""
for arg in args:
# Handle args - process each argument
for i, arg in enumerate(args):
processed_content = []

if isinstance(arg, str):
prompt = f"{prompt}{arg}\n"
processed_content.append({"type": "text", "text": arg})
elif isinstance(arg, list):
for subarg in arg:
prompt = f"{prompt}{subarg}\n"
if prompt:
for j, subarg in enumerate(arg):
Comment thread
nirga marked this conversation as resolved.
Outdated
if isinstance(subarg, str):
processed_content.append({"type": "text", "text": subarg})
elif _is_image_part(subarg):
processed_image = await _process_image_part(
subarg, span.context.trace_id, span.context.span_id, j
)
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(subarg)})
elif _is_image_part(arg):
processed_image = await _process_image_part(
arg, span.context.trace_id, span.context.span_id, 0
)
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(arg)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
"user",
)
elif "prompt" in kwargs:
_set_span_attribute(
span, f"{SpanAttributes.LLM_PROMPTS}.0.content",
json.dumps([{"type": "text", "text": kwargs["prompt"]}])
)
_set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user")


# Keep sync version for backward compatibility
@dont_throw
def set_input_attributes_sync(span, args, kwargs, llm_model):
"""Synchronous version with image processing support"""
if not span.is_recording():
return

if not should_send_prompts():
return

if "contents" in kwargs:
contents = kwargs["contents"]
if isinstance(contents, str):
# Simple string content in OpenAI format
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
prompt,
json.dumps([{"type": "text", "text": contents}]),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.role",
"user",
)
elif isinstance(contents, list):
Comment thread
nirga marked this conversation as resolved.
# Process content list - could be mixed text and Part objects
for i, content in enumerate(contents):
processed_content = []

if hasattr(content, "parts"):
# Content with parts (Google GenAI Content object)
for j, part in enumerate(content.parts):
if hasattr(part, "text") and part.text:
processed_content.append({"type": "text", "text": part.text})
elif _is_image_part(part):
processed_image = _process_image_part_sync(
part, span.context.trace_id, span.context.span_id, j
)
processed_content.append(processed_image)
else:
# Other part types
processed_content.append({"type": "text", "text": str(part)})
elif isinstance(content, str):
# Direct string in the list
processed_content.append({"type": "text", "text": content})
elif _is_image_part(content):
# Direct Part object that's an image
processed_image = _process_image_part_sync(
content, span.context.trace_id, span.context.span_id, 0
)
processed_content.append(processed_image)
else:
# Other content types
processed_content.append({"type": "text", "text": str(content)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
getattr(content, "role", "user"),
)
elif args and len(args) > 0:
# Handle args - process each argument
for i, arg in enumerate(args):
processed_content = []

if isinstance(arg, str):
processed_content.append({"type": "text", "text": arg})
elif isinstance(arg, list):
for j, subarg in enumerate(arg):
if isinstance(subarg, str):
processed_content.append({"type": "text", "text": subarg})
elif _is_image_part(subarg):
processed_image = _process_image_part_sync(
subarg, span.context.trace_id, span.context.span_id, j
)
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(subarg)})
elif _is_image_part(arg):
processed_image = _process_image_part_sync(
arg, span.context.trace_id, span.context.span_id, 0
)
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(arg)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
"user",
)
elif "prompt" in kwargs:
_set_span_attribute(
span, f"{SpanAttributes.LLM_PROMPTS}.0.content", kwargs["prompt"]
span, f"{SpanAttributes.LLM_PROMPTS}.0.content",
json.dumps([{"type": "text", "text": kwargs["prompt"]}])
)
_set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user")

Expand Down
Loading
Loading