diff --git a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py index 88140f2fc6..edaeae76f0 100644 --- a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py +++ b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py @@ -81,32 +81,32 @@ "method": "stream", "span_name": "anthropic.chat", }, - # # Beta API methods (regular Anthropic SDK) - # { - # "package": "anthropic.resources.beta.messages.messages", - # "object": "Messages", - # "method": "create", - # "span_name": "anthropic.chat", - # }, - # { - # "package": "anthropic.resources.beta.messages.messages", - # "object": "Messages", - # "method": "stream", - # "span_name": "anthropic.chat", - # }, - # # Beta API methods (Bedrock SDK) - # { - # "package": "anthropic.lib.bedrock._beta_messages", - # "object": "Messages", - # "method": "create", - # "span_name": "anthropic.chat", - # }, - # { - # "package": "anthropic.lib.bedrock._beta_messages", - # "object": "Messages", - # "method": "stream", - # "span_name": "anthropic.chat", - # }, + # Beta API methods (regular Anthropic SDK) + { + "package": "anthropic.resources.beta.messages.messages", + "object": "Messages", + "method": "create", + "span_name": "anthropic.chat", + }, + { + "package": "anthropic.resources.beta.messages.messages", + "object": "Messages", + "method": "stream", + "span_name": "anthropic.chat", + }, + # Beta API methods (Bedrock SDK) + { + "package": "anthropic.lib.bedrock._beta_messages", + "object": "Messages", + "method": "create", + "span_name": "anthropic.chat", + }, + { + "package": "anthropic.lib.bedrock._beta_messages", + "object": "Messages", + "method": "stream", + "span_name": "anthropic.chat", + }, ] WRAPPED_AMETHODS = [ @@ -122,32 +122,32 @@ "method": "create", "span_name": "anthropic.chat", }, - # # Beta API async methods (regular Anthropic SDK) - # { - # "package": "anthropic.resources.beta.messages.messages", - # "object": "AsyncMessages", - # "method": "create", - # "span_name": "anthropic.chat", - # }, - # { - # "package": "anthropic.resources.beta.messages.messages", - # "object": "AsyncMessages", - # "method": "stream", - # "span_name": "anthropic.chat", - # }, - # # Beta API async methods (Bedrock SDK) - # { - # "package": "anthropic.lib.bedrock._beta_messages", - # "object": "AsyncMessages", - # "method": "create", - # "span_name": "anthropic.chat", - # }, - # { - # "package": "anthropic.lib.bedrock._beta_messages", - # "object": "AsyncMessages", - # "method": "stream", - # "span_name": "anthropic.chat", - # }, + # Beta API async methods (regular Anthropic SDK) + { + "package": "anthropic.resources.beta.messages.messages", + "object": "AsyncMessages", + "method": "create", + "span_name": "anthropic.chat", + }, + { + "package": "anthropic.resources.beta.messages.messages", + "object": "AsyncMessages", + "method": "stream", + "span_name": "anthropic.chat", + }, + # Beta API async methods (Bedrock SDK) + { + "package": "anthropic.lib.bedrock._beta_messages", + "object": "AsyncMessages", + "method": "create", + "span_name": "anthropic.chat", + }, + { + "package": "anthropic.lib.bedrock._beta_messages", + "object": "AsyncMessages", + "method": "stream", + "span_name": "anthropic.chat", + }, ] @@ -182,14 +182,35 @@ async def _aset_token_usage( token_histogram: Histogram = None, choice_counter: Counter = None, ): - from opentelemetry.instrumentation.anthropic.utils import _aextract_response_data + import inspect + + # If we get a coroutine, await it + if inspect.iscoroutine(response): + try: + response = await response + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to await coroutine response: {e}") + return + + # Handle with_raw_response wrapped responses first + if response and hasattr(response, "parse") and callable(response.parse): + try: + response = response.parse() + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to parse with_raw_response: {e}") + return - response = await _aextract_response_data(response) + # Safely get usage attribute without extracting the whole object + usage = getattr(response, "usage", None) if response else None - if usage := response.get("usage"): - prompt_tokens = usage.input_tokens - cache_read_tokens = dict(usage).get("cache_read_input_tokens", 0) or 0 - cache_creation_tokens = dict(usage).get("cache_creation_input_tokens", 0) or 0 + if usage: + prompt_tokens = getattr(usage, "input_tokens", 0) + cache_read_tokens = getattr(usage, "cache_read_input_tokens", 0) or 0 + cache_creation_tokens = getattr(usage, "cache_creation_input_tokens", 0) or 0 else: prompt_tokens = await acount_prompt_tokens_from_request(anthropic, request) cache_read_tokens = 0 @@ -206,18 +227,18 @@ async def _aset_token_usage( }, ) - if usage := response.get("usage"): - completion_tokens = usage.output_tokens + if usage: + completion_tokens = getattr(usage, "output_tokens", 0) else: completion_tokens = 0 if hasattr(anthropic, "count_tokens"): - if response.get("completion"): + completion_attr = getattr(response, "completion", None) + content_attr = getattr(response, "content", None) + if completion_attr: + completion_tokens = await anthropic.count_tokens(completion_attr) + elif content_attr and len(content_attr) > 0: completion_tokens = await anthropic.count_tokens( - response.get("completion") - ) - elif response.get("content"): - completion_tokens = await anthropic.count_tokens( - response.get("content")[0].text + content_attr[0].text ) if ( @@ -236,9 +257,11 @@ async def _aset_token_usage( total_tokens = input_tokens + completion_tokens choices = 0 - if isinstance(response.get("content"), list): - choices = len(response.get("content")) - elif response.get("completion"): + content_attr = getattr(response, "content", None) + completion_attr = getattr(response, "completion", None) + if isinstance(content_attr, list): + choices = len(content_attr) + elif completion_attr: choices = 1 if choices > 0 and choice_counter: @@ -246,7 +269,7 @@ async def _aset_token_usage( choices, attributes={ **metric_attributes, - SpanAttributes.LLM_RESPONSE_STOP_REASON: response.get("stop_reason"), + SpanAttributes.LLM_RESPONSE_STOP_REASON: getattr(response, "stop_reason", None), }, ) @@ -276,14 +299,32 @@ def _set_token_usage( token_histogram: Histogram = None, choice_counter: Counter = None, ): - from opentelemetry.instrumentation.anthropic.utils import _extract_response_data + import inspect + + # If we get a coroutine, we cannot process it in sync context + if inspect.iscoroutine(response): + import logging + logger = logging.getLogger(__name__) + logger.warning(f"_set_token_usage received coroutine {response} - token usage processing skipped") + return + + # Handle with_raw_response wrapped responses first + if response and hasattr(response, "parse") and callable(response.parse): + try: + response = response.parse() + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to parse with_raw_response: {e}") + return - response = _extract_response_data(response) + # Safely get usage attribute without extracting the whole object + usage = getattr(response, "usage", None) if response else None - if usage := response.get("usage"): - prompt_tokens = usage.input_tokens - cache_read_tokens = dict(usage).get("cache_read_input_tokens", 0) or 0 - cache_creation_tokens = dict(usage).get("cache_creation_input_tokens", 0) or 0 + if usage: + prompt_tokens = getattr(usage, "input_tokens", 0) + cache_read_tokens = getattr(usage, "cache_read_input_tokens", 0) or 0 + cache_creation_tokens = getattr(usage, "cache_creation_input_tokens", 0) or 0 else: prompt_tokens = count_prompt_tokens_from_request(anthropic, request) cache_read_tokens = 0 @@ -300,16 +341,18 @@ def _set_token_usage( }, ) - if usage := response.get("usage"): - completion_tokens = usage.output_tokens + if usage: + completion_tokens = getattr(usage, "output_tokens", 0) else: completion_tokens = 0 if hasattr(anthropic, "count_tokens"): - if response.get("completion"): - completion_tokens = anthropic.count_tokens(response.get("completion")) - elif response.get("content"): + completion_attr = getattr(response, "completion", None) + content_attr = getattr(response, "content", None) + if completion_attr: + completion_tokens = anthropic.count_tokens(completion_attr) + elif content_attr and len(content_attr) > 0: completion_tokens = anthropic.count_tokens( - response.get("content")[0].text + content_attr[0].text ) if ( @@ -328,9 +371,11 @@ def _set_token_usage( total_tokens = input_tokens + completion_tokens choices = 0 - if isinstance(response.get("content"), list): - choices = len(response.get("content")) - elif response.get("completion"): + content_attr = getattr(response, "content", None) + completion_attr = getattr(response, "completion", None) + if isinstance(content_attr, list): + choices = len(content_attr) + elif completion_attr: choices = 1 if choices > 0 and choice_counter: @@ -338,7 +383,7 @@ def _set_token_usage( choices, attributes={ **metric_attributes, - SpanAttributes.LLM_RESPONSE_STOP_REASON: response.get("stop_reason"), + SpanAttributes.LLM_RESPONSE_STOP_REASON: getattr(response, "stop_reason", None), }, ) diff --git a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/utils.py b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/utils.py index e25a21ca91..48cc0dac8f 100644 --- a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/utils.py +++ b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/utils.py @@ -136,27 +136,78 @@ def _extract_response_data(response): @dont_throw async def ashared_metrics_attributes(response): - response = await _aextract_response_data(response) + import inspect + + # If we get a coroutine, await it + if inspect.iscoroutine(response): + try: + response = await response + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to await coroutine response: {e}") + response = None + + # If it's already a dict (e.g., from streaming), use it directly + if isinstance(response, dict): + model = response.get("model") + else: + # Handle with_raw_response wrapped responses first + if response and hasattr(response, "parse") and callable(response.parse): + try: + response = response.parse() + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to parse with_raw_response: {e}") + response = None + + # Safely get model attribute without extracting the whole object + model = getattr(response, "model", None) if response else None common_attributes = Config.get_common_metrics_attributes() return { **common_attributes, GEN_AI_SYSTEM: GEN_AI_SYSTEM_ANTHROPIC, - SpanAttributes.LLM_RESPONSE_MODEL: response.get("model"), + SpanAttributes.LLM_RESPONSE_MODEL: model, } @dont_throw def shared_metrics_attributes(response): - response = _extract_response_data(response) + import inspect + + # If we get a coroutine, we cannot process it in sync context + if inspect.iscoroutine(response): + import logging + logger = logging.getLogger(__name__) + logger.warning(f"shared_metrics_attributes received coroutine {response} - using None for model") + response = None + + # If it's already a dict (e.g., from streaming), use it directly + if isinstance(response, dict): + model = response.get("model") + else: + # Handle with_raw_response wrapped responses first + if response and hasattr(response, "parse") and callable(response.parse): + try: + response = response.parse() + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to parse with_raw_response: {e}") + response = None + + # Safely get model attribute without extracting the whole object + model = getattr(response, "model", None) if response else None common_attributes = Config.get_common_metrics_attributes() return { **common_attributes, GEN_AI_SYSTEM: GEN_AI_SYSTEM_ANTHROPIC, - SpanAttributes.LLM_RESPONSE_MODEL: response.get("model"), + SpanAttributes.LLM_RESPONSE_MODEL: model, } diff --git a/packages/opentelemetry-instrumentation-anthropic/tests/test_bedrock_with_raw_response.py b/packages/opentelemetry-instrumentation-anthropic/tests/test_bedrock_with_raw_response.py index 183743474e..fab8fe84d7 100644 --- a/packages/opentelemetry-instrumentation-anthropic/tests/test_bedrock_with_raw_response.py +++ b/packages/opentelemetry-instrumentation-anthropic/tests/test_bedrock_with_raw_response.py @@ -25,7 +25,7 @@ def async_anthropic_bedrock_client(instrument_legacy): ) -@pytest.mark.skip +# @pytest.mark.skip @pytest.mark.asyncio @pytest.mark.vcr async def test_async_anthropic_bedrock_with_raw_response( @@ -80,7 +80,7 @@ async def test_async_anthropic_bedrock_with_raw_response( ) -@pytest.mark.skip +# @pytest.mark.skip @pytest.mark.asyncio @pytest.mark.vcr async def test_async_anthropic_bedrock_regular_create( @@ -129,7 +129,7 @@ async def test_async_anthropic_bedrock_regular_create( ) -@pytest.mark.skip +# @pytest.mark.skip @pytest.mark.asyncio @pytest.mark.vcr async def test_async_anthropic_bedrock_beta_with_raw_response( diff --git a/packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/span_utils.py b/packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/span_utils.py index 13ddb05d60..901f59c9a1 100644 --- a/packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/span_utils.py +++ b/packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/span_utils.py @@ -12,7 +12,7 @@ SpanAttributes, ) -anthropic_client = anthropic.Anthropic() +anthropic_client = None def _set_span_attribute(span, name, value): @@ -273,9 +273,29 @@ def _set_anthropic_messages_span_attributes( def _count_anthropic_tokens(messages: list[str]): + global anthropic_client + + # Lazy initialization of the Anthropic client + if anthropic_client is None: + try: + anthropic_client = anthropic.Anthropic() + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to initialize Anthropic client for token counting: {e}") + # Return 0 if we can't create the client + return 0 + count = 0 - for message in messages: - count += anthropic_client.count_tokens(text=message) + try: + for message in messages: + count += anthropic_client.count_tokens(text=message) + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"Failed to count tokens with Anthropic client: {e}") + return 0 + return count diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index b7fab46364..7e56abf2aa 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -887,20 +887,16 @@ def init_pymysql_instrumentor(): def init_bedrock_instrumentor(should_enrich_metrics: bool): - try: - if is_package_installed("boto3"): - from opentelemetry.instrumentation.bedrock import BedrockInstrumentor + if is_package_installed("boto3"): + from opentelemetry.instrumentation.bedrock import BedrockInstrumentor - instrumentor = BedrockInstrumentor( - exception_logger=lambda e: Telemetry().log_exception(e), - enrich_token_usage=should_enrich_metrics, - ) - if not instrumentor.is_instrumented_by_opentelemetry: - instrumentor.instrument() - return True - except Exception as e: - logging.error(f"Error initializing Bedrock instrumentor: {e}") - Telemetry().log_exception(e) + instrumentor = BedrockInstrumentor( + exception_logger=lambda e: Telemetry().log_exception(e), + enrich_token_usage=should_enrich_metrics, + ) + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument() + return True return False