diff --git a/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py b/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py index dc915e9ad7..466441fe63 100644 --- a/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py +++ b/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/__init__.py @@ -13,7 +13,7 @@ emit_message_events, ) from opentelemetry.instrumentation.google_generativeai.span_utils import ( - set_input_attributes, + set_input_attributes_sync, set_model_request_attributes, set_model_response_attributes, set_response_attributes, @@ -103,7 +103,7 @@ def _handle_request(span, args, kwargs, llm_model, event_logger): if should_emit_events() and event_logger: emit_message_events(args, kwargs, event_logger) else: - set_input_attributes(span, args, kwargs, llm_model) + set_input_attributes_sync(span, args, kwargs, llm_model) set_model_request_attributes(span, kwargs, llm_model) @@ -249,10 +249,12 @@ def _wrap( class GoogleGenerativeAiInstrumentor(BaseInstrumentor): """An instrumentor for Google Generative AI's client library.""" - def __init__(self, exception_logger=None, use_legacy_attributes=True): + def __init__(self, exception_logger=None, use_legacy_attributes=True, upload_base64_image=None): super().__init__() Config.exception_logger = exception_logger Config.use_legacy_attributes = use_legacy_attributes + if upload_base64_image: + Config.upload_base64_image = upload_base64_image def instrumentation_dependencies(self) -> Collection[str]: return ("google-genai >= 1.0.0",) diff --git a/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/config.py b/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/config.py index 44199c038c..93ad99eb9f 100644 --- a/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/config.py +++ b/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/config.py @@ -1,3 +1,9 @@ +from typing import Callable + + class Config: exception_logger = None use_legacy_attributes = True + upload_base64_image: Callable[[str, str, str, str], str] = ( + lambda trace_id, span_id, image_name, base64_string: str + ) diff --git a/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py b/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py index 9879ed5136..a7ca0842b6 100644 --- a/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py +++ b/packages/opentelemetry-instrumentation-google-generativeai/opentelemetry/instrumentation/google_generativeai/span_utils.py @@ -1,13 +1,22 @@ +import json +import base64 +import logging +import asyncio +import threading from opentelemetry.instrumentation.google_generativeai.utils import ( dont_throw, should_send_prompts, ) +from opentelemetry.instrumentation.google_generativeai.config import Config from opentelemetry.semconv_ai import ( SpanAttributes, ) from opentelemetry.trace.status import Status, StatusCode +logger = logging.getLogger(__name__) + + def _set_span_attribute(span, name, value): if value is not None: if value != "": @@ -15,8 +24,186 @@ def _set_span_attribute(span, name, value): return +def _is_image_part(item): + """Check if item is a Google GenAI Part object containing image data""" + try: + # Check if it has the Part attributes we expect for new Google GenAI SDK + if hasattr(item, 'inline_data') and item.inline_data is not None: + # Check if it's an image mime type and has data + if (hasattr(item.inline_data, 'mime_type') and + item.inline_data.mime_type and + 'image/' in item.inline_data.mime_type and + hasattr(item.inline_data, 'data') and + item.inline_data.data): + return True + return False + except Exception: + return False + + +async def _process_image_part(item, trace_id, span_id, content_index): + """Process a Google GenAI Part object containing image data""" + if not Config.upload_base64_image: + return None + + try: + # Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg') + image_format = item.inline_data.mime_type.split('/')[1] if item.inline_data.mime_type else 'unknown' + image_name = f"content_{content_index}.{image_format}" + + # Convert binary data to base64 string for upload + binary_data = item.inline_data.data + base64_string = base64.b64encode(binary_data).decode('utf-8') + + # Upload the base64 data - convert IDs to strings + url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) + + # Return OpenAI-compatible format for consistency across LLM providers + return { + "type": "image_url", + "image_url": {"url": url} + } + except Exception as e: + logger.warning(f"Failed to process image part: {e}") + # Return None to skip adding this image to the span + return None + + +def run_async(method): + """Handle async method in sync context, following OpenAI's battle-tested approach""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + thread = threading.Thread(target=lambda: asyncio.run(method)) + thread.start() + thread.join() + else: + asyncio.run(method) + + +def _process_image_part_sync(item, trace_id, span_id, content_index): + """Synchronous version of image part processing using OpenAI's pattern""" + if not Config.upload_base64_image: + return None + + try: + # Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg') + image_format = item.inline_data.mime_type.split('/')[1] if item.inline_data.mime_type else 'unknown' + image_name = f"content_{content_index}.{image_format}" + + # Convert binary data to base64 string for upload + binary_data = item.inline_data.data + base64_string = base64.b64encode(binary_data).decode('utf-8') + + # Use OpenAI's run_async pattern to handle the async upload function + url = None + + async def upload_task(): + nonlocal url + url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) + + run_async(upload_task()) + + return { + "type": "image_url", + "image_url": {"url": url} + } + except Exception as e: + logger.warning(f"Failed to process image part sync: {e}") + # Return None to skip adding this image to the span + return None + + +async def _process_content_item(content_item, span): + """Process a single content item, handling different types (Content objects, strings, Parts)""" + processed_content = [] + + if hasattr(content_item, "parts"): + # Content with parts (Google GenAI Content object) + for part_index, part in enumerate(content_item.parts): + processed_part = await _process_content_part(part, span, part_index) + if processed_part is not None: + processed_content.append(processed_part) + elif isinstance(content_item, str): + # Direct string in the list + processed_content.append({"type": "text", "text": content_item}) + elif _is_image_part(content_item): + # Direct Part object that's an image + processed_image = await _process_image_part( + content_item, span.context.trace_id, span.context.span_id, 0 + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + # Other content types + processed_content.append({"type": "text", "text": str(content_item)}) + + return processed_content + + +async def _process_content_part(part, span, part_index): + """Process a single part within a Content object""" + if hasattr(part, "text") and part.text: + return {"type": "text", "text": part.text} + elif _is_image_part(part): + return await _process_image_part( + part, span.context.trace_id, span.context.span_id, part_index + ) + else: + # Other part types + return {"type": "text", "text": str(part)} + + +def _set_prompt_attributes(span, prompt_index, processed_content, content_item): + """Set span attributes for a processed prompt""" + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", + json.dumps(processed_content), + ) + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", + getattr(content_item, "role", "user"), + ) + + +async def _process_argument(argument, span): + """Process a single argument from args list""" + processed_content = [] + + if isinstance(argument, str): + processed_content.append({"type": "text", "text": argument}) + elif isinstance(argument, list): + for sub_index, sub_item in enumerate(argument): + if isinstance(sub_item, str): + processed_content.append({"type": "text", "text": sub_item}) + elif _is_image_part(sub_item): + processed_image = await _process_image_part( + sub_item, span.context.trace_id, span.context.span_id, sub_index + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + processed_content.append({"type": "text", "text": str(sub_item)}) + elif _is_image_part(argument): + processed_image = await _process_image_part( + argument, span.context.trace_id, span.context.span_id, 0 + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + processed_content.append({"type": "text", "text": str(argument)}) + + return processed_content + + @dont_throw -def set_input_attributes(span, args, kwargs, llm_model): +async def set_input_attributes(span, args, kwargs, llm_model): + """Process input arguments, handling both text and image content""" if not span.is_recording(): return @@ -26,10 +213,11 @@ def set_input_attributes(span, args, kwargs, llm_model): if "contents" in kwargs: contents = kwargs["contents"] if isinstance(contents, str): + # Simple string content in OpenAI format _set_span_attribute( span, f"{SpanAttributes.LLM_PROMPTS}.0.content", - contents, + json.dumps([{"type": "text", "text": contents}]), ) _set_span_attribute( span, @@ -37,42 +225,147 @@ def set_input_attributes(span, args, kwargs, llm_model): "user", ) elif isinstance(contents, list): - for i, content in enumerate(contents): - if hasattr(content, "parts"): - for part in content.parts: - if hasattr(part, "text"): - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{i}.content", - part.text, - ) - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{i}.role", - getattr(content, "role", "user"), - ) + # Process content list - could be mixed text and Part objects + for prompt_index, content_item in enumerate(contents): + processed_content = await _process_content_item(content_item, span) + + if processed_content: + _set_prompt_attributes(span, prompt_index, processed_content, content_item) elif args and len(args) > 0: - prompt = "" - for arg in args: - if isinstance(arg, str): - prompt = f"{prompt}{arg}\n" - elif isinstance(arg, list): - for subarg in arg: - prompt = f"{prompt}{subarg}\n" - if prompt: + # Handle args - process each argument + for arg_index, argument in enumerate(args): + processed_content = await _process_argument(argument, span) + + if processed_content: + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{arg_index}.content", + json.dumps(processed_content), + ) + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{arg_index}.role", + "user", + ) + elif "prompt" in kwargs: + _set_span_attribute( + span, f"{SpanAttributes.LLM_PROMPTS}.0.content", + json.dumps([{"type": "text", "text": kwargs["prompt"]}]) + ) + _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user") + + +# Keep sync version for backward compatibility +@dont_throw +def set_input_attributes_sync(span, args, kwargs, llm_model): + """Synchronous version with image processing support""" + if not span.is_recording(): + return + + if not should_send_prompts(): + return + + if "contents" in kwargs: + contents = kwargs["contents"] + if isinstance(contents, str): + # Simple string content in OpenAI format _set_span_attribute( span, f"{SpanAttributes.LLM_PROMPTS}.0.content", - prompt, + json.dumps([{"type": "text", "text": contents}]), ) _set_span_attribute( span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user", ) + elif isinstance(contents, list): + # Process content list - could be mixed text and Part objects + for i, content in enumerate(contents): + processed_content = [] + + if hasattr(content, "parts"): + # Content with parts (Google GenAI Content object) + for j, part in enumerate(content.parts): + if hasattr(part, "text") and part.text: + processed_content.append({"type": "text", "text": part.text}) + elif _is_image_part(part): + processed_image = _process_image_part_sync( + part, span.context.trace_id, span.context.span_id, j + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + # Other part types + processed_content.append({"type": "text", "text": str(part)}) + elif isinstance(content, str): + # Direct string in the list + processed_content.append({"type": "text", "text": content}) + elif _is_image_part(content): + # Direct Part object that's an image + processed_image = _process_image_part_sync( + content, span.context.trace_id, span.context.span_id, 0 + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + # Other content types + processed_content.append({"type": "text", "text": str(content)}) + + if processed_content: + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{i}.content", + json.dumps(processed_content), + ) + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{i}.role", + getattr(content, "role", "user"), + ) + elif args and len(args) > 0: + # Handle args - process each argument + for i, arg in enumerate(args): + processed_content = [] + + if isinstance(arg, str): + processed_content.append({"type": "text", "text": arg}) + elif isinstance(arg, list): + for j, subarg in enumerate(arg): + if isinstance(subarg, str): + processed_content.append({"type": "text", "text": subarg}) + elif _is_image_part(subarg): + processed_image = _process_image_part_sync( + subarg, span.context.trace_id, span.context.span_id, j + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + processed_content.append({"type": "text", "text": str(subarg)}) + elif _is_image_part(arg): + processed_image = _process_image_part_sync( + arg, span.context.trace_id, span.context.span_id, 0 + ) + if processed_image is not None: + processed_content.append(processed_image) + else: + processed_content.append({"type": "text", "text": str(arg)}) + + if processed_content: + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{i}.content", + json.dumps(processed_content), + ) + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{i}.role", + "user", + ) elif "prompt" in kwargs: _set_span_attribute( - span, f"{SpanAttributes.LLM_PROMPTS}.0.content", kwargs["prompt"] + span, f"{SpanAttributes.LLM_PROMPTS}.0.content", + json.dumps([{"type": "text", "text": kwargs["prompt"]}]) ) _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user") diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py index 38df9731a7..0b8c9d6b84 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py @@ -429,7 +429,8 @@ async def _process_image_item(item, trace_id, span_id, message_index, content_in image_format = item["image_url"]["url"].split(";")[0].split("/")[1] image_name = f"message_{message_index}_content_{content_index}.{image_format}" base64_string = item["image_url"]["url"].split(",")[1] - url = await Config.upload_base64_image(trace_id, span_id, image_name, base64_string) + # Convert trace_id and span_id to strings as expected by upload function + url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) return {"type": "image_url", "image_url": {"url": url}} diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/config.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/config.py index 65cd450d78..aac2c7978c 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/config.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/config.py @@ -7,8 +7,8 @@ class Config: enrich_assistant = False exception_logger = None get_common_metrics_attributes: Callable[[], dict] = lambda: {} - upload_base64_image: Callable[[str, str, str], str] = ( - lambda trace_id, span_id, base64_image_url: str + upload_base64_image: Callable[[str, str, str, str], str] = ( + lambda trace_id, span_id, image_name, base64_string: str ) enable_trace_context_propagation: bool = True use_legacy_attributes = True diff --git a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py index 67068e3f65..5290e27510 100644 --- a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py +++ b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py @@ -15,6 +15,7 @@ ) from opentelemetry.instrumentation.vertexai.span_utils import ( set_input_attributes, + set_input_attributes_sync, set_model_input_attributes, set_model_response_attributes, set_response_attributes, @@ -178,12 +179,12 @@ async def _abuild_from_streaming_response(span, event_logger, response, llm_mode @dont_throw -def _handle_request(span, event_logger, args, kwargs, llm_model): +async def _handle_request(span, event_logger, args, kwargs, llm_model): set_model_input_attributes(span, kwargs, llm_model) if should_emit_events(): emit_prompt_events(args, event_logger) else: - set_input_attributes(span, args) + await set_input_attributes(span, args) def _handle_response(span, event_logger, response, llm_model): @@ -223,6 +224,11 @@ async def _awrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs) llm_model = instance._model_id if hasattr(instance, "_model_name"): llm_model = instance._model_name.replace("publishers/google/models/", "") + # For ChatSession, try to get model from the parent model object + if hasattr(instance, "_model") and hasattr(instance._model, "_model_name"): + llm_model = instance._model._model_name.replace("publishers/google/models/", "") + elif hasattr(instance, "_model") and hasattr(instance._model, "_model_id"): + llm_model = instance._model._model_id name = to_wrap.get("span_name") span = tracer.start_span( @@ -234,7 +240,7 @@ async def _awrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs) }, ) - _handle_request(span, event_logger, args, kwargs, llm_model) + await _handle_request(span, event_logger, args, kwargs, llm_model) response = await wrapped(*args, **kwargs) @@ -267,6 +273,11 @@ def _wrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs): llm_model = instance._model_id if hasattr(instance, "_model_name"): llm_model = instance._model_name.replace("publishers/google/models/", "") + # For ChatSession, try to get model from the parent model object + if hasattr(instance, "_model") and hasattr(instance._model, "_model_name"): + llm_model = instance._model._model_name.replace("publishers/google/models/", "") + elif hasattr(instance, "_model") and hasattr(instance._model, "_model_id"): + llm_model = instance._model._model_id name = to_wrap.get("span_name") span = tracer.start_span( @@ -278,7 +289,12 @@ def _wrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs): }, ) - _handle_request(span, event_logger, args, kwargs, llm_model) + # Use sync version for non-async wrapper to avoid image processing for now + set_model_input_attributes(span, kwargs, llm_model) + if should_emit_events(): + emit_prompt_events(args, event_logger) + else: + set_input_attributes_sync(span, args) response = wrapped(*args, **kwargs) @@ -301,10 +317,12 @@ def _wrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs): class VertexAIInstrumentor(BaseInstrumentor): """An instrumentor for VertextAI's client library.""" - def __init__(self, exception_logger=None, use_legacy_attributes=True): + def __init__(self, exception_logger=None, use_legacy_attributes=True, upload_base64_image=None): super().__init__() Config.exception_logger = exception_logger Config.use_legacy_attributes = use_legacy_attributes + if upload_base64_image: + Config.upload_base64_image = upload_base64_image def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/config.py b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/config.py index 44199c038c..93ad99eb9f 100644 --- a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/config.py +++ b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/config.py @@ -1,3 +1,9 @@ +from typing import Callable + + class Config: exception_logger = None use_legacy_attributes = True + upload_base64_image: Callable[[str, str, str, str], str] = ( + lambda trace_id, span_id, image_name, base64_string: str + ) diff --git a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/span_utils.py b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/span_utils.py index f12873887f..df2aa17f1c 100644 --- a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/span_utils.py +++ b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/span_utils.py @@ -1,7 +1,17 @@ +import copy +import json +import base64 +import logging +import asyncio +import threading from opentelemetry.instrumentation.vertexai.utils import dont_throw, should_send_prompts +from opentelemetry.instrumentation.vertexai.config import Config from opentelemetry.semconv_ai import SpanAttributes +logger = logging.getLogger(__name__) + + def _set_span_attribute(span, name, value): if value is not None: if value != "": @@ -9,24 +19,225 @@ def _set_span_attribute(span, name, value): return +def _is_base64_image_part(item): + """Check if item is a VertexAI Part object containing image data""" + try: + # Check if it has the Part attributes we expect + if not hasattr(item, 'inline_data') or not hasattr(item, 'mime_type'): + return False + + # Check if it's an image mime type and has inline data + if item.mime_type and 'image/' in item.mime_type and item.inline_data: + # Check if the inline_data has actual data + if hasattr(item.inline_data, 'data') and item.inline_data.data: + return True + + return False + except Exception: + return False + + +async def _process_image_part(item, trace_id, span_id, content_index): + """Process a VertexAI Part object containing image data""" + if not Config.upload_base64_image: + return None + + try: + # Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg') + image_format = item.mime_type.split('/')[1] if item.mime_type else 'unknown' + image_name = f"content_{content_index}.{image_format}" + + # Convert binary data to base64 string for upload + binary_data = item.inline_data.data + base64_string = base64.b64encode(binary_data).decode('utf-8') + + # Upload the base64 data - convert IDs to strings + url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) + + # Return OpenAI-compatible format for consistency across LLM providers + return { + "type": "image_url", + "image_url": {"url": url} + } + except Exception as e: + logger.warning(f"Failed to process image part: {e}") + # Return None to skip adding this image to the span + return None + + +def run_async(method): + """Handle async method in sync context, following OpenAI's battle-tested approach""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + thread = threading.Thread(target=lambda: asyncio.run(method)) + thread.start() + thread.join() + else: + asyncio.run(method) + + +def _process_image_part_sync(item, trace_id, span_id, content_index): + """Synchronous version of image part processing using OpenAI's pattern""" + if not Config.upload_base64_image: + return None + + try: + # Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg') + image_format = item.mime_type.split('/')[1] if item.mime_type else 'unknown' + image_name = f"content_{content_index}.{image_format}" + + # Convert binary data to base64 string for upload + binary_data = item.inline_data.data + base64_string = base64.b64encode(binary_data).decode('utf-8') + + # Use OpenAI's run_async pattern to handle the async upload function + url = None + + async def upload_task(): + nonlocal url + url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) + + run_async(upload_task()) + + return { + "type": "image_url", + "image_url": {"url": url} + } + except Exception as e: + logger.warning(f"Failed to process image part sync: {e}") + # Return None to skip adding this image to the span + return None + + +async def _process_vertexai_argument(argument, span): + """Process a single argument for VertexAI, handling different types""" + if isinstance(argument, str): + # Simple text argument in OpenAI format + return [{"type": "text", "text": argument}] + + elif isinstance(argument, list): + # List of mixed content (text strings and Part objects) - deep copy and process + content_list = copy.deepcopy(argument) + processed_items = [] + + for item_index, content_item in enumerate(content_list): + processed_item = await _process_content_item_vertexai(content_item, span, item_index) + if processed_item is not None: + processed_items.append(processed_item) + + return processed_items + + else: + # Single Part object - convert to OpenAI format + processed_item = await _process_content_item_vertexai(argument, span, 0) + return [processed_item] if processed_item is not None else [] + + +async def _process_content_item_vertexai(content_item, span, item_index): + """Process a single content item for VertexAI""" + if isinstance(content_item, str): + # Convert text to OpenAI format + return {"type": "text", "text": content_item} + + elif _is_base64_image_part(content_item): + # Process image part + return await _process_image_part( + content_item, span.context.trace_id, span.context.span_id, item_index + ) + + elif hasattr(content_item, 'text'): + # Text part to OpenAI format + return {"type": "text", "text": content_item.text} + + else: + # Other types as text + return {"type": "text", "text": str(content_item)} + + +def _process_vertexai_argument_sync(argument, span): + """Synchronous version of argument processing for VertexAI""" + if isinstance(argument, str): + # Simple text argument in OpenAI format + return [{"type": "text", "text": argument}] + + elif isinstance(argument, list): + # List of mixed content (text strings and Part objects) - deep copy and process + content_list = copy.deepcopy(argument) + processed_items = [] + + for item_index, content_item in enumerate(content_list): + processed_item = _process_content_item_vertexai_sync(content_item, span, item_index) + if processed_item is not None: + processed_items.append(processed_item) + + return processed_items + + else: + # Single Part object - convert to OpenAI format + processed_item = _process_content_item_vertexai_sync(argument, span, 0) + return [processed_item] if processed_item is not None else [] + + +def _process_content_item_vertexai_sync(content_item, span, item_index): + """Synchronous version of content item processing for VertexAI""" + if isinstance(content_item, str): + # Convert text to OpenAI format + return {"type": "text", "text": content_item} + + elif _is_base64_image_part(content_item): + # Process image part + return _process_image_part_sync( + content_item, span.context.trace_id, span.context.span_id, item_index + ) + + elif hasattr(content_item, 'text'): + # Text part to OpenAI format + return {"type": "text", "text": content_item.text} + + else: + # Other types as text + return {"type": "text", "text": str(content_item)} + + @dont_throw -def set_input_attributes(span, args): +async def set_input_attributes(span, args): + """Process input arguments, handling both text and image content""" if not span.is_recording(): return if should_send_prompts() and args is not None and len(args) > 0: - prompt = "" - for arg in args: - if isinstance(arg, str): - prompt = f"{prompt}{arg}\n" - elif isinstance(arg, list): - for subarg in arg: - prompt = f"{prompt}{subarg}\n" + # Process each argument using extracted helper methods + for arg_index, argument in enumerate(args): + processed_content = await _process_vertexai_argument(argument, span) - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.0.user", - prompt, - ) + if processed_content: + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{arg_index}.content", + json.dumps(processed_content) + ) + + +# Sync version with image processing support +@dont_throw +def set_input_attributes_sync(span, args): + """Synchronous version with image processing support""" + if not span.is_recording(): + return + if should_send_prompts() and args is not None and len(args) > 0: + # Process each argument using extracted helper methods + for arg_index, argument in enumerate(args): + processed_content = _process_vertexai_argument_sync(argument, span) + + if processed_content: + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{arg_index}.content", + json.dumps(processed_content) + ) @dont_throw diff --git a/packages/sample-app/sample_app/google_genai_image_example.py b/packages/sample-app/sample_app/google_genai_image_example.py new file mode 100644 index 0000000000..5884d5d3c4 --- /dev/null +++ b/packages/sample-app/sample_app/google_genai_image_example.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Sample app to test Google Generative AI with image support +""" + +import os +from dotenv import load_dotenv +from traceloop.sdk import Traceloop +from traceloop.sdk.decorators import workflow + +load_dotenv() + +# Initialize Traceloop SDK +Traceloop.init(app_name="google_genai_image_example") + +try: + # Use the new Google GenAI SDK + import google.genai as genai + from google.genai import types + + print("=== Google GenAI Image Example with Traceloop ===") + + # Initialize client + client = genai.Client(vertexai=True, project=os.environ.get("GOOGLE_CLOUD_PROJECT"), location="global") + + @workflow("describe_image_google_genai") + def describe_image_from_local_path(image_path: str) -> str: + """Describe an image using Google GenAI from a local file path""" + + # Load the image from local path + with open(image_path, "rb") as image_file: + image_data = image_file.read() + + # Determine mime type from file extension + if image_path.lower().endswith(('.png', '.PNG')): + mime_type = "image/png" + elif image_path.lower().endswith(('.jpg', '.jpeg', '.JPG', '.JPEG')): + mime_type = "image/jpeg" + elif image_path.lower().endswith(('.gif', '.GIF')): + mime_type = "image/gif" + elif image_path.lower().endswith(('.webp', '.WEBP')): + mime_type = "image/webp" + else: + mime_type = "image/jpeg" # Default fallback + + # Create image part using the new SDK structure + image_part = types.Part( + inline_data=types.Blob( + mime_type=mime_type, + data=image_data + ) + ) + + print(f"Created image part with mime_type: {image_part.inline_data.mime_type}") + print(f"Image data size: {len(image_part.inline_data.data)} bytes") + + # Create contents with text and image + contents = [ + "Describe what you see in this image in detail.", + image_part + ] + + try: + # Make API call + response = client.models.generate_content( + model='gemini-2.0-flash-exp', + contents=contents + ) + + print(f"Response: {response.text}") + return response.text + + except Exception as e: + print(f"API call failed: {e}") + return f"Error: {str(e)}" + + # Test with the elephant image + image_path = os.path.join(os.path.dirname(__file__), "..", "data", "vision", "elephant.jpeg") + + if os.path.exists(image_path): + print(f"\n📸 Testing with image: {image_path}") + result = describe_image_from_local_path(image_path) + print(f"\n🤖 Description: {result}") + else: + print(f"\n⚠️ Image not found at: {image_path}") + print("Please ensure the elephant.jpeg file exists in data/vision/") + +except ImportError as e: + print(f"Google GenAI SDK not available: {e}") + print("Please install: pip install google-genai") + +print("\n✅ Done testing Google GenAI image support with Traceloop!") diff --git a/packages/sample-app/sample_app/vertex_gemini_vision_example.py b/packages/sample-app/sample_app/vertex_gemini_vision_example.py new file mode 100644 index 0000000000..0b47b2bf2b --- /dev/null +++ b/packages/sample-app/sample_app/vertex_gemini_vision_example.py @@ -0,0 +1,116 @@ +import os +import dotenv +import vertexai +from traceloop.sdk import Traceloop +from traceloop.sdk.decorators import workflow +from vertexai.generative_models import GenerativeModel, Part + +dotenv.load_dotenv() + +Traceloop.init(app_name="gemini_vision_example") + +# Initialize Vertex AI +# You can set GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_LOCATION environment variables +# or pass them directly to vertexai.init() +vertexai.init() + + +@workflow("describe_image_from_path") +def describe_image_from_local_path(image_path: str) -> str: + """Describe an image using Gemini model from a local file path""" + + model = GenerativeModel("gemini-2.5-flash") + + # Load the image from local path + with open(image_path, "rb") as image_file: + image_data = image_file.read() + + # Create image part from bytes + image_part = Part.from_data( + data=image_data, + mime_type="image/jpeg" + ) + + # Generate content with image and text prompt + response = model.generate_content([ + "Describe what you see in this image in detail. What is the main subject and what are they doing?", + image_part + ]) + + return response.text + + +@workflow("analyze_image_from_gcs") +def analyze_image_from_gcs(gcs_uri: str) -> str: + """Analyze an image using Gemini model from Google Cloud Storage URI""" + + model = GenerativeModel("gemini-2.5-flash") + + # Create image part from GCS URI + image_part = Part.from_uri( + uri=gcs_uri, + mime_type="image/jpeg" + ) + + # Generate content with image and text prompt + response = model.generate_content([ + "What objects do you see in this image? List them and describe their characteristics.", + image_part + ]) + + return response.text + + +@workflow("multi_turn_vision_chat") +def multi_turn_vision_chat(image_path: str) -> str: + """Have a multi-turn conversation about an image""" + + model = GenerativeModel("gemini-2.5-flash") + + # Load the image from local path + with open(image_path, "rb") as image_file: + image_data = image_file.read() + + # Create image part from bytes + image_part = Part.from_data( + data=image_data, + mime_type="image/jpeg" + ) + + # Start a chat session + chat = model.start_chat() + + # First turn with image + response1 = chat.send_message([ + "What animal do you see in this image?", + image_part + ]) + + # Follow-up questions without image (continues the conversation) + response2 = chat.send_message("What is the natural habitat of this animal?") + response3 = chat.send_message("What are some interesting facts about this animal?") + + # Return the final response + return f"First: {response1.text}\nSecond: {response2.text}\nThird: {response3.text}" + + +if __name__ == "__main__": + # Path to the sample image + image_path = "data/vision/elephant.jpeg" + + # Check if the image exists + if os.path.exists(image_path): + print("=== Describing Image from Local Path ===") + result1 = describe_image_from_local_path(image_path) + print(result1) + + print("\n=== Multi-turn Vision Chat ===") + result2 = multi_turn_vision_chat(image_path) + print(result2) + else: + print(f"Image not found at {image_path}") + + # Example with GCS URI (uncomment if you have a GCS image) + # print("\n=== Analyzing Image from GCS ===") + # gcs_result = analyze_image_from_gcs("gs://your-bucket/your-image.jpg") + # print(gcs_result) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index 7e56abf2aa..a800b985be 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -423,7 +423,7 @@ def init_tracer_provider(resource: Resource, sampler: Optional[Sampler] = None) def init_instrumentations( should_enrich_metrics: bool, - base64_image_uploader: Callable[[str, str, str], str], + base64_image_uploader: Callable[[str, str, str, str], str], instruments: Optional[Set[Instruments]] = None, block_instruments: Optional[Set[Instruments]] = None, ): @@ -458,7 +458,7 @@ def init_instrumentations( if init_crewai_instrumentor(): instrument_set = True elif instrument == Instruments.GOOGLE_GENERATIVEAI: - if init_google_generativeai_instrumentor(): + if init_google_generativeai_instrumentor(should_enrich_metrics, base64_image_uploader): instrument_set = True elif instrument == Instruments.GROQ: if init_groq_instrumentor(): @@ -527,7 +527,9 @@ def init_instrumentations( if init_urllib3_instrumentor(): instrument_set = True elif instrument == Instruments.VERTEXAI: - if init_vertexai_instrumentor(): + if init_vertexai_instrumentor( + should_enrich_metrics, base64_image_uploader + ): instrument_set = True elif instrument == Instruments.WATSONX: if init_watsonx_instrumentor(): @@ -557,7 +559,7 @@ def init_instrumentations( def init_openai_instrumentor( - should_enrich_metrics: bool, base64_image_uploader: Callable[[str, str, str], str] + should_enrich_metrics: bool, base64_image_uploader: Callable[[str, str, str, str], str] ): try: if is_package_installed("openai"): @@ -581,7 +583,7 @@ def init_openai_instrumentor( def init_anthropic_instrumentor( - should_enrich_metrics: bool, base64_image_uploader: Callable[[str, str, str], str] + should_enrich_metrics: bool, base64_image_uploader: Callable[[str, str, str, str], str] ): try: if is_package_installed("anthropic"): @@ -675,7 +677,9 @@ def init_chroma_instrumentor(): return False -def init_google_generativeai_instrumentor(): +def init_google_generativeai_instrumentor( + should_enrich_metrics: bool, base64_image_uploader: Callable[[str, str, str, str], str] +): try: if is_package_installed("google-generativeai") or is_package_installed("google-genai"): Telemetry().capture("instrumentation:gemini:init") @@ -685,6 +689,7 @@ def init_google_generativeai_instrumentor(): instrumentor = GoogleGenerativeAiInstrumentor( exception_logger=lambda e: Telemetry().log_exception(e), + upload_base64_image=base64_image_uploader, ) if not instrumentor.is_instrumented_by_opentelemetry: instrumentor.instrument() @@ -935,7 +940,9 @@ def init_replicate_instrumentor(): return False -def init_vertexai_instrumentor(): +def init_vertexai_instrumentor( + should_enrich_metrics: bool, base64_image_uploader: Callable[[str, str, str, str], str] +): try: if is_package_installed("google-cloud-aiplatform"): Telemetry().capture("instrumentation:vertexai:init") @@ -943,6 +950,7 @@ def init_vertexai_instrumentor(): instrumentor = VertexAIInstrumentor( exception_logger=lambda e: Telemetry().log_exception(e), + upload_base64_image=base64_image_uploader, ) if not instrumentor.is_instrumented_by_opentelemetry: instrumentor.instrument()