Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
emit_message_events,
)
from opentelemetry.instrumentation.google_generativeai.span_utils import (
set_input_attributes,
set_input_attributes_sync,
set_model_request_attributes,
set_model_response_attributes,
set_response_attributes,
Expand Down Expand Up @@ -103,7 +103,7 @@ def _handle_request(span, args, kwargs, llm_model, event_logger):
if should_emit_events() and event_logger:
emit_message_events(args, kwargs, event_logger)
else:
set_input_attributes(span, args, kwargs, llm_model)
set_input_attributes_sync(span, args, kwargs, llm_model)

set_model_request_attributes(span, kwargs, llm_model)

Expand Down Expand Up @@ -249,10 +249,12 @@ def _wrap(
class GoogleGenerativeAiInstrumentor(BaseInstrumentor):
"""An instrumentor for Google Generative AI's client library."""

def __init__(self, exception_logger=None, use_legacy_attributes=True):
def __init__(self, exception_logger=None, use_legacy_attributes=True, upload_base64_image=None):
super().__init__()
Config.exception_logger = exception_logger
Config.use_legacy_attributes = use_legacy_attributes
if upload_base64_image:
Config.upload_base64_image = upload_base64_image

def instrumentation_dependencies(self) -> Collection[str]:
return ("google-genai >= 1.0.0",)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from typing import Callable


class Config:
exception_logger = None
use_legacy_attributes = True
upload_base64_image: Callable[[str, str, str, str], str] = (
lambda trace_id, span_id, image_name, base64_string: str

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda returns the type 'str' instead of a string value. This default no-op doesn't match expected behavior. Consider returning a valid string (or an async no-op function) instead of the type.

Suggested change
lambda trace_id, span_id, image_name, base64_string: str
lambda trace_id, span_id, image_name, base64_string: ""

)
Comment on lines +7 to +9

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix default upload_base64_image: current lambda returns the str type, not a URL string

The default should return a valid string (ideally a harmless placeholder URL) rather than the str class object. As-is, any call site that uses the default will get <class 'str'>, which breaks downstream JSON formatting for image_url.url and can crash or silently misbehave.

Apply this diff:

-    upload_base64_image: Callable[[str, str, str, str], str] = (
-        lambda trace_id, span_id, image_name, base64_string: str
-    )
+    # Default: do not upload, but return a harmless placeholder URL.
+    upload_base64_image: Callable[[str, str, str, str], str] = (
+        lambda trace_id, span_id, image_name, base64_string: "about:blank"
+    )

Optionally, consider letting this be Optional[Callable] with None default and handling the fallback in span_utils for more explicit behavior. I can draft that change across both google_generativeai and vertexai configs to keep them consistent.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
upload_base64_image: Callable[[str, str, str, str], str] = (
lambda trace_id, span_id, image_name, base64_string: str
)
# Default: do not upload, but return a harmless placeholder URL.
upload_base64_image: Callable[[str, str, str, str], str] = (
lambda trace_id, span_id, image_name, base64_string: "about:blank"
)
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/config.py
around lines 7-9, the default upload_base64_image lambda currently returns the
str class object rather than a URL string; replace the default implementation so
it returns a harmless placeholder URL string (e.g.
"https://example.com/placeholder.png") instead of str; alternatively, if you
prefer the Optional approach, change the type to Optional[Callable[[str, str,
str, str], str]] = None and handle the fallback where upload_base64_image is
invoked (e.g., in span_utils) to return a placeholder URL when None.

Original file line number Diff line number Diff line change
@@ -1,22 +1,103 @@
import json
import base64
import logging
import asyncio
from opentelemetry.instrumentation.google_generativeai.utils import (
dont_throw,
should_send_prompts,
)
from opentelemetry.instrumentation.google_generativeai.config import Config
from opentelemetry.semconv_ai import (
SpanAttributes,
)
from opentelemetry.trace.status import Status, StatusCode


logger = logging.getLogger(__name__)


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


def _is_image_part(item):
"""Check if item is a Google GenAI Part object containing image data"""
try:
# Check if it has the Part attributes we expect for new Google GenAI SDK
if hasattr(item, 'inline_data') and item.inline_data is not None:
# Check if it's an image mime type and has data
if (hasattr(item.inline_data, 'mime_type') and
item.inline_data.mime_type and
'image/' in item.inline_data.mime_type and
hasattr(item.inline_data, 'data') and
item.inline_data.data):
return True
return False
except Exception:
return False
Comment thread
nirga marked this conversation as resolved.


async def _process_image_part(item, trace_id, span_id, content_index):
"""Process a Google GenAI Part object containing image data"""
if not Config.upload_base64_image:
return None

try:
# Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg')
image_format = item.inline_data.mime_type.split('/')[1] if item.inline_data.mime_type else 'unknown'
image_name = f"content_{content_index}.{image_format}"

# Convert binary data to base64 string for upload
binary_data = item.inline_data.data
base64_string = base64.b64encode(binary_data).decode('utf-8')

# Upload the base64 data
url = await Config.upload_base64_image(trace_id, span_id, image_name, base64_string)

# Return OpenAI-compatible format for consistency across LLM providers
return {
"type": "image_url",
"image_url": {"url": url}
}
except Exception as e:
logger.warning(f"Failed to process image part: {e}")
# Return None to skip adding this image to the span
return None
Comment on lines +66 to +69

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return a configurable fallback image URL on upload failure (aligns with PR intent and prior review)

On failures you currently return None, which drops the image entirely. PR text claims “graceful fallback URLs when uploads fail.” Use configurable fallbacks instead of dropping the item.

     except Exception as e:
         logger.warning(f"Failed to process image part: {e}")
-        # Return None to skip adding this image to the span
-        return None
+        # Graceful fallback
+        fallback_url = getattr(
+            Config,
+            "fallback_async_image_url",
+            getattr(Config, "fallback_image_url", None),
+        )
+        if fallback_url:
+            return {"type": "image_url", "image_url": {"url": fallback_url}}
+        return None
     except Exception as e:
         logger.warning(f"Failed to process image part sync: {e}")
-        # Return None to skip adding this image to the span
-        return None
+        # Graceful fallback
+        fallback_url = getattr(
+            Config,
+            "fallback_sync_image_url",
+            getattr(Config, "fallback_image_url", None),
+        )
+        if fallback_url:
+            return {"type": "image_url", "image_url": {"url": fallback_url}}
+        return None

Note: This keeps fallback configuration centralized in Config (avoids hard-coded URLs). It also addresses the earlier feedback about making fallbacks configurable rather than hard-coded.

Also applies to: 92-95

🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py
around lines 65-68 (and similarly lines 92-95) the current except blocks return
None which drops the image; update them to return a configurable fallback image
URL from the project's Config instead of None. Replace the None return with
something like Config.get().fallback_image_url (or the appropriate Config
accessor used elsewhere), ensure Config has a fallback_image_url entry (with
sensible default), and add a brief debug log noting the fallback was used so the
span still contains an image URL when upload processing fails.


Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _process_image_part_sync(item, trace_id, span_id, content_index):
"""Synchronous version of image part processing"""
if not Config.upload_base64_image:
return None

try:
# Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg')
image_format = item.inline_data.mime_type.split('/')[1] if item.inline_data.mime_type else 'unknown'
image_name = f"content_{content_index}.{image_format}"

# Convert binary data to base64 string for upload
binary_data = item.inline_data.data
base64_string = base64.b64encode(binary_data).decode('utf-8')

# Use asyncio.run to call the async upload function in sync context
url = asyncio.run(Config.upload_base64_image(trace_id, span_id, image_name, base64_string))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the try/except block around asyncio.run() may lead to RuntimeError if an event loop is already running. Consider reintroducing proper error handling or using a workaround for nested event loops.


return {
"type": "image_url",
"image_url": {"url": url}
}
except Exception as e:
logger.warning(f"Failed to process image part sync: {e}")
# Return None to skip adding this image to the span
return None


@dont_throw
def set_input_attributes(span, args, kwargs, llm_model):
async def set_input_attributes(span, args, kwargs, llm_model):
"""Process input arguments, handling both text and image content"""
Comment on lines 204 to +206

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

dont_throw on an async function won’t catch in-coroutine exceptions

@dont_throw as implemented in utils.py is synchronous and won’t catch exceptions raised inside this async def set_input_attributes(...). This silently bypasses the intended shielding/logging.

Two viable fixes (A preferred):

  • A) Update the decorator in utils.py to handle coroutine functions.
  • B) Remove the decorator here and add an internal try/except.

A) Example (utils.py):

import functools, inspect, logging, traceback

def dont_throw(func):
    logger = logging.getLogger(func.__module__)

    if inspect.iscoroutinefunction(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                logger.debug(
                    "OpenLLMetry failed to trace in %s, error: %s",
                    func.__name__,
                    traceback.format_exc(),
                )
                if Config.exception_logger:
                    Config.exception_logger(e)
        return async_wrapper

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.debug(
                "OpenLLMetry failed to trace in %s, error: %s",
                func.__name__,
                traceback.format_exc(),
            )
            if Config.exception_logger:
                Config.exception_logger(e)
    return wrapper

Would you like me to open a follow-up PR to implement (A) and sweep for other async uses of @dont_throw?

if not span.is_recording():
Comment on lines 204 to 207

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

dont_throw wraps an async function incorrectly — exceptions inside the coroutine won’t be caught

The current dont_throw decorator is synchronous and won’t catch exceptions raised inside async def set_input_attributes(...). This silently disables the intended error shielding/logging for this async path.

Two viable fixes (prefer A for reuse):

  • A) Make the decorator support coroutine functions globally (utils.py). Keep the decorator here unchanged.
  • B) Remove the decorator from the async function and handle try/except inside the function body.

A) Update decorator (utils.py), then keep using @dont_throw here:

# utils.py
import functools, inspect, logging, traceback

def dont_throw(func):
    logger = logging.getLogger(func.__module__)

    if inspect.iscoroutinefunction(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                logger.debug(
                    "OpenLLMetry failed to trace in %s, error: %s",
                    func.__name__,
                    traceback.format_exc(),
                )
                if Config.exception_logger:
                    Config.exception_logger(e)
        return async_wrapper

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.debug(
                "OpenLLMetry failed to trace in %s, error: %s",
                func.__name__,
                traceback.format_exc(),
            )
            if Config.exception_logger:
                Config.exception_logger(e)
    return wrapper

B) Alternatively, drop the decorator here and wrap the function body with try/except.

🤖 Prompt for AI Agents
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py
around lines 98-101: the dont_throw decorator is synchronous so exceptions
raised inside this async function won’t be caught; fix by updating dont_throw in
utils.py to detect coroutine functions (inspect.iscoroutinefunction) and return
an async wrapper that awaits the function and catches/logs exceptions
(preferred, then keep @dont_throw here), or as an alternative remove the
decorator here and add an internal try/except in set_input_attributes to
catch/log exceptions.

Comment on lines +205 to 207

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Signature change to async: audit call sites to ensure proper awaiting or use of the sync wrapper

set_input_attributes is now async. Ensure instrumentors call await set_input_attributes(...) or use set_input_attributes_sync(...) in synchronous code paths.

Run:

Expected: All real invocations in async contexts are awaited; sync contexts use set_input_attributes_sync.

Also applies to: 212-216


🏁 Script executed:

#!/bin/bash
# Find all call sites
rg -nP --type=py -C2 '\bset_input_attributes\s*\('

echo
echo "Calls that are awaited:"
rg -nP --type=py -C2 'await\s+set_input_attributes\s*\('

echo
echo "Suspicious un-awaited calls (may be false positives if they call the _sync version):"
rg -nP --type=py -C2 '(?<!await\s)\bset_input_attributes\s*\(' | rg -v '_sync\s*\('

Length of output: 17620


🏁 Script executed:

#!/bin/bash
# Display the head of the async span_utils implementation
sed -n '1,200p' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py

echo
echo "— Sync wrapper definitions in span_utils.py —"
rg -nP --type=py '_sync' -n packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py

echo
echo "— Calls to async set_input_attributes in __init__.py —"
rg -nP --type=py -C2 '\bset_input_attributes\s*\(' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py

echo
echo "— Calls to sync set_input_attributes_sync in __init__.py —"
rg -nP --type=py -C2 '\bset_input_attributes_sync\s*\(' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py

Length of output: 9700


🏁 Script executed:

#!/bin/bash
# Show the first 200 lines of the google_generativeai instrumentation __init__.py for context
sed -n '1,200p' packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py

Length of output: 5948


Fix async wrapper to use the async set_input_attributes instead of the sync shim

The async instrumentation in
packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py
still calls the synchronous helper (_handle_requestset_input_attributes_sync), which invokes asyncio.run(...) inside an already-running event loop. In a live async context this will raise RuntimeError: asyncio.run() cannot be called from a running event loop, breaking prompt-attribute recording.

• Location: async wrapper _awrap (around line 155)
• Current code:

@_with_tracer_wrapper
async def _awrap(...):
    ...
    _handle_request(span, args, kwargs, llm_model, event_logger)
    response = await wrapped(*args, **kwargs)
    ...

• Proposed change: call the async setter directly (and preserve the dont_throw behavior):

-    _handle_request(span, args, kwargs, llm_model, event_logger)
+    # use async setter in async contexts
+    try:
+        if should_emit_events() and event_logger:
+            emit_message_events(args, kwargs, event_logger)
+        else:
+            await set_input_attributes(span, args, kwargs, llm_model)
+    except Exception:
+        # swallow per @dont_throw contract
+        pass
     response = await wrapped(*args, **kwargs)

• Remove or deprecate _handle_request for async use, or split into separate sync/async handlers.
• Verify the sync wrapper (_handle_requestset_input_attributes_sync) remains used in the sync _wrap.

This change ensures that in async flows we properly await set_input_attributes, and only use the sync shim in purely synchronous instrumentation.

return

Expand All @@ -26,53 +107,219 @@ def set_input_attributes(span, args, kwargs, llm_model):
if "contents" in kwargs:
contents = kwargs["contents"]
if isinstance(contents, str):
# Simple string content in OpenAI format
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
contents,
json.dumps([{"type": "text", "text": contents}]),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.role",
"user",
)
elif isinstance(contents, list):
# Process content list - could be mixed text and Part objects
for i, content in enumerate(contents):
processed_content = []

if hasattr(content, "parts"):
for part in content.parts:
if hasattr(part, "text"):
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
part.text,
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
getattr(content, "role", "user"),
# Content with parts (Google GenAI Content object)
Comment thread
nirga marked this conversation as resolved.
Outdated
for j, part in enumerate(content.parts):
if hasattr(part, "text") and part.text:
processed_content.append({"type": "text", "text": part.text})
elif _is_image_part(part):
processed_image = await _process_image_part(
part, span.context.trace_id, span.context.span_id, j
)
if processed_image is not None:
processed_content.append(processed_image)
else:
# Other part types
processed_content.append({"type": "text", "text": str(part)})
elif isinstance(content, str):
# Direct string in the list
processed_content.append({"type": "text", "text": content})
elif _is_image_part(content):
# Direct Part object that's an image
processed_image = await _process_image_part(
content, span.context.trace_id, span.context.span_id, 0
)
if processed_image is not None:
processed_content.append(processed_image)
else:
# Other content types
processed_content.append({"type": "text", "text": str(content)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
getattr(content, "role", "user"),
)
elif args and len(args) > 0:
prompt = ""
for arg in args:
# Handle args - process each argument
for i, arg in enumerate(args):
processed_content = []

if isinstance(arg, str):
prompt = f"{prompt}{arg}\n"
processed_content.append({"type": "text", "text": arg})
elif isinstance(arg, list):
for subarg in arg:
prompt = f"{prompt}{subarg}\n"
if prompt:
for j, subarg in enumerate(arg):
Comment thread
nirga marked this conversation as resolved.
Outdated
if isinstance(subarg, str):
processed_content.append({"type": "text", "text": subarg})
elif _is_image_part(subarg):
processed_image = await _process_image_part(
subarg, span.context.trace_id, span.context.span_id, j
)
if processed_image is not None:
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(subarg)})
elif _is_image_part(arg):
processed_image = await _process_image_part(
arg, span.context.trace_id, span.context.span_id, 0
)
if processed_image is not None:
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(arg)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
"user",
)
elif "prompt" in kwargs:
_set_span_attribute(
span, f"{SpanAttributes.LLM_PROMPTS}.0.content",
json.dumps([{"type": "text", "text": kwargs["prompt"]}])
)
_set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user")


# Keep sync version for backward compatibility
@dont_throw
def set_input_attributes_sync(span, args, kwargs, llm_model):
"""Synchronous version with image processing support"""
if not span.is_recording():
return

if not should_send_prompts():
return

if "contents" in kwargs:
contents = kwargs["contents"]
if isinstance(contents, str):
# Simple string content in OpenAI format
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
prompt,
json.dumps([{"type": "text", "text": contents}]),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.role",
"user",
)
elif isinstance(contents, list):
Comment thread
nirga marked this conversation as resolved.
# Process content list - could be mixed text and Part objects
for i, content in enumerate(contents):
processed_content = []

if hasattr(content, "parts"):
# Content with parts (Google GenAI Content object)
for j, part in enumerate(content.parts):
if hasattr(part, "text") and part.text:
processed_content.append({"type": "text", "text": part.text})
elif _is_image_part(part):
processed_image = _process_image_part_sync(
part, span.context.trace_id, span.context.span_id, j
)
if processed_image is not None:
processed_content.append(processed_image)
else:
# Other part types
processed_content.append({"type": "text", "text": str(part)})
elif isinstance(content, str):
# Direct string in the list
processed_content.append({"type": "text", "text": content})
elif _is_image_part(content):
# Direct Part object that's an image
processed_image = _process_image_part_sync(
content, span.context.trace_id, span.context.span_id, 0
)
if processed_image is not None:
processed_content.append(processed_image)
else:
# Other content types
processed_content.append({"type": "text", "text": str(content)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
getattr(content, "role", "user"),
)
elif args and len(args) > 0:
# Handle args - process each argument
for i, arg in enumerate(args):
processed_content = []

if isinstance(arg, str):
processed_content.append({"type": "text", "text": arg})
elif isinstance(arg, list):
for j, subarg in enumerate(arg):
if isinstance(subarg, str):
processed_content.append({"type": "text", "text": subarg})
elif _is_image_part(subarg):
processed_image = _process_image_part_sync(
subarg, span.context.trace_id, span.context.span_id, j
)
if processed_image is not None:
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(subarg)})
elif _is_image_part(arg):
processed_image = _process_image_part_sync(
arg, span.context.trace_id, span.context.span_id, 0
)
if processed_image is not None:
processed_content.append(processed_image)
else:
processed_content.append({"type": "text", "text": str(arg)})

if processed_content:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.content",
json.dumps(processed_content),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.{i}.role",
"user",
)
elif "prompt" in kwargs:
_set_span_attribute(
span, f"{SpanAttributes.LLM_PROMPTS}.0.content", kwargs["prompt"]
span, f"{SpanAttributes.LLM_PROMPTS}.0.content",
json.dumps([{"type": "text", "text": kwargs["prompt"]}])
)
_set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user")

Expand Down
Loading
Loading