diff --git a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py index edaeae76f0..933b1171fc 100644 --- a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py +++ b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py @@ -17,8 +17,8 @@ set_response_attributes, ) from opentelemetry.instrumentation.anthropic.streaming import ( - abuild_from_streaming_response, - build_from_streaming_response, + AnthropicAsyncStream, + AnthropicStream, WrappedAsyncMessageStreamManager, WrappedMessageStreamManager, ) @@ -558,7 +558,7 @@ def _wrap( end_time = time.time() if is_streaming_response(response): - return build_from_streaming_response( + return AnthropicStream( span, response, instance._client, @@ -679,7 +679,7 @@ async def _awrap( raise e if is_streaming_response(response): - return abuild_from_streaming_response( + return AnthropicAsyncStream( span, response, instance._client, diff --git a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py index 1c85b6b6a2..3b032f6171 100644 --- a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py +++ b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py @@ -24,6 +24,7 @@ ) from opentelemetry.semconv_ai import SpanAttributes from opentelemetry.trace.status import Status, StatusCode +from wrapt import ObjectProxy logger = logging.getLogger(__name__) @@ -145,157 +146,309 @@ def _handle_streaming_response(span, event_logger, complete_response): set_streaming_response_attributes(span, complete_response.get("events")) -@dont_throw -def build_from_streaming_response( - span, - response, - instance, - start_time, - token_histogram: Histogram = None, - choice_counter: Counter = None, - duration_histogram: Histogram = None, - exception_counter: Counter = None, - event_logger: Optional[EventLogger] = None, - kwargs: dict = {}, -): - complete_response = {"events": [], "model": "", "usage": {}, "id": ""} - for item in response: +class AnthropicStream(ObjectProxy): + """Wrapper for Anthropic streaming responses that handles instrumentation while preserving helper methods""" + + def __init__( + self, + span, + response, + instance, + start_time, + token_histogram: Histogram = None, + choice_counter: Counter = None, + duration_histogram: Histogram = None, + exception_counter: Counter = None, + event_logger: Optional[EventLogger] = None, + kwargs: dict = {}, + ): + super().__init__(response) + + self._span = span + self._instance = instance + self._start_time = start_time + self._token_histogram = token_histogram + self._choice_counter = choice_counter + self._duration_histogram = duration_histogram + self._exception_counter = exception_counter + self._event_logger = event_logger + self._kwargs = kwargs + + self._complete_response = {"events": [], "model": "", "usage": {}, "id": ""} + self._instrumentation_completed = False + + def __getattr__(self, name): + """Override helper methods to ensure they go through our instrumented iteration""" + if name == 'get_final_message': + return self._instrumented_get_final_message + elif name == 'text_stream': + return self._instrumented_text_stream + elif name == 'until_done': + return self._instrumented_until_done + else: + return super().__getattr__(name) + + def _instrumented_get_final_message(self): + """Instrumented version of get_final_message that goes through our proxy""" + for _ in self: + pass + original_get_final_message = getattr(self.__wrapped__, 'get_final_message') + return original_get_final_message() + + @property + def _instrumented_text_stream(self): + """Instrumented version of text_stream that goes through our proxy""" + def text_generator(): + for event in self: + if (hasattr(event, 'delta') and + hasattr(event.delta, 'type') and + event.delta.type == 'text_delta' and + hasattr(event.delta, 'text')): + yield event.delta.text + return text_generator() + + def _instrumented_until_done(self): + """Instrumented version of until_done that goes through our proxy""" + for _ in self: + pass + + def __iter__(self): + return self + + def __next__(self): try: - yield item + item = self.__wrapped__.__next__() + except StopIteration: + # Stream is complete - handle instrumentation + if not self._instrumentation_completed: + self._complete_instrumentation() + raise except Exception as e: - attributes = error_metrics_attributes(e) - if exception_counter: - exception_counter.add(1, attributes=attributes) - raise e - _process_response_item(item, complete_response) - - metric_attributes = shared_metrics_attributes(complete_response) - set_span_attribute(span, GEN_AI_RESPONSE_ID, complete_response.get("id")) - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes=metric_attributes, - ) + # Handle errors during streaming + if not self._instrumentation_completed: + attributes = error_metrics_attributes(e) + if self._exception_counter: + self._exception_counter.add(1, attributes=attributes) + if self._span and self._span.is_recording(): + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.end() + self._instrumentation_completed = True + raise + else: + # Process the item for instrumentation + _process_response_item(item, self._complete_response) + return item + + def _complete_instrumentation(self): + """Complete the instrumentation when stream is fully consumed""" + if self._instrumentation_completed: + return - # calculate token usage - if Config.enrich_token_usage: - try: - completion_tokens = -1 - # prompt_usage - if usage := complete_response.get("usage"): - prompt_tokens = usage.get("input_tokens", 0) or 0 - else: - prompt_tokens = count_prompt_tokens_from_request(instance, kwargs) + # This mirrors the logic from build_from_streaming_response + metric_attributes = shared_metrics_attributes(self._complete_response) + set_span_attribute(self._span, GEN_AI_RESPONSE_ID, self._complete_response.get("id")) - # completion_usage - if usage := complete_response.get("usage"): - completion_tokens = usage.get("output_tokens", 0) or 0 - else: - completion_content = "" - if complete_response.get("events"): - model_name = complete_response.get("model") or None - for event in complete_response.get("events"): # type: dict - if event.get("text"): - completion_content += event.get("text") - - if model_name and hasattr(instance, "count_tokens"): - completion_tokens = instance.count_tokens(completion_content) - - _set_token_usage( - span, - complete_response, - prompt_tokens, - completion_tokens, - metric_attributes, - token_histogram, - choice_counter, + if self._duration_histogram: + duration = time.time() - self._start_time + self._duration_histogram.record( + duration, + attributes=metric_attributes, ) - except Exception as e: - logger.warning("Failed to set token usage, error: %s", e) - _handle_streaming_response(span, event_logger, complete_response) + # Calculate token usage + if Config.enrich_token_usage: + try: + if usage := self._complete_response.get("usage"): + prompt_tokens = usage.get("input_tokens", 0) or 0 + else: + prompt_tokens = count_prompt_tokens_from_request(self._instance, self._kwargs) + + if usage := self._complete_response.get("usage"): + completion_tokens = usage.get("output_tokens", 0) or 0 + else: + completion_content = "" + if self._complete_response.get("events"): + model_name = self._complete_response.get("model") or None + for event in self._complete_response.get("events"): + if event.get("text"): + completion_content += event.get("text") + + if model_name and hasattr(self._instance, "count_tokens"): + completion_tokens = self._instance.count_tokens(completion_content) + + _set_token_usage( + self._span, + self._complete_response, + prompt_tokens, + completion_tokens, + metric_attributes, + self._token_histogram, + self._choice_counter, + ) + except Exception as e: + logger.warning("Failed to set token usage, error: %s", e) - if span.is_recording(): - span.set_status(Status(StatusCode.OK)) - span.end() + _handle_streaming_response(self._span, self._event_logger, self._complete_response) + if self._span.is_recording(): + self._span.set_status(Status(StatusCode.OK)) + self._span.end() -@dont_throw -async def abuild_from_streaming_response( - span, - response, - instance, - start_time, - token_histogram: Histogram = None, - choice_counter: Counter = None, - duration_histogram: Histogram = None, - exception_counter: Counter = None, - event_logger: Optional[EventLogger] = None, - kwargs: dict = {}, -): - complete_response = {"events": [], "model": "", "usage": {}, "id": ""} - async for item in response: - try: - yield item - except Exception as e: - attributes = error_metrics_attributes(e) - if exception_counter: - exception_counter.add(1, attributes=attributes) - raise e - _process_response_item(item, complete_response) + self._instrumentation_completed = True - set_span_attribute(span, GEN_AI_RESPONSE_ID, complete_response.get("id")) - metric_attributes = shared_metrics_attributes(complete_response) +class AnthropicAsyncStream(ObjectProxy): + """Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods""" - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes=metric_attributes, - ) + def __init__( + self, + span, + response, + instance, + start_time, + token_histogram: Histogram = None, + choice_counter: Counter = None, + duration_histogram: Histogram = None, + exception_counter: Counter = None, + event_logger: Optional[EventLogger] = None, + kwargs: dict = {}, + ): + super().__init__(response) - # calculate token usage - if Config.enrich_token_usage: + self._span = span + self._instance = instance + self._start_time = start_time + self._token_histogram = token_histogram + self._choice_counter = choice_counter + self._duration_histogram = duration_histogram + self._exception_counter = exception_counter + self._event_logger = event_logger + self._kwargs = kwargs + + self._complete_response = {"events": [], "model": "", "usage": {}, "id": ""} + self._instrumentation_completed = False + + def __getattr__(self, name): + """Override helper methods to ensure they go through our instrumented iteration""" + if name == 'get_final_message': + return self._instrumented_get_final_message + elif name == 'text_stream': + return self._instrumented_text_stream + elif name == 'until_done': + return self._instrumented_until_done + else: + return super().__getattr__(name) + + async def _instrumented_get_final_message(self): + """Instrumented version of get_final_message that goes through our proxy""" + # Consume the entire stream through our instrumentation + async for _ in self: + pass + # Now call the original method to get the final message + # We need to access the original method directly + original_get_final_message = getattr(self.__wrapped__, 'get_final_message') + return await original_get_final_message() + + @property + def _instrumented_text_stream(self): + """Instrumented version of text_stream that goes through our proxy""" + async def text_generator(): + async for event in self: + if (hasattr(event, 'delta') and + hasattr(event.delta, 'type') and + event.delta.type == 'text_delta' and + hasattr(event.delta, 'text')): + yield event.delta.text + return text_generator() + + async def _instrumented_until_done(self): + """Instrumented version of until_done that goes through our proxy""" + async for _ in self: + pass + + def __aiter__(self): + return self + + async def __anext__(self): try: - # prompt_usage - if usage := complete_response.get("usage"): - prompt_tokens = usage.get("input_tokens", 0) - else: - prompt_tokens = count_prompt_tokens_from_request(instance, kwargs) + item = await self.__wrapped__.__anext__() + except StopAsyncIteration: + # Stream is complete - handle instrumentation + if not self._instrumentation_completed: + self._complete_instrumentation() + raise + except Exception as e: + # Handle errors during streaming + if not self._instrumentation_completed: + attributes = error_metrics_attributes(e) + if self._exception_counter: + self._exception_counter.add(1, attributes=attributes) + if self._span and self._span.is_recording(): + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.end() + self._instrumentation_completed = True + raise + else: + # Process the item for instrumentation + _process_response_item(item, self._complete_response) + return item + + def _complete_instrumentation(self): + """Complete the instrumentation when stream is fully consumed""" + if self._instrumentation_completed: + return - # completion_usage - if usage := complete_response.get("usage"): - completion_tokens = usage.get("output_tokens", 0) - else: - completion_content = "" - if complete_response.get("events"): - model_name = complete_response.get("model") or None - for event in complete_response.get("events"): # type: dict - if event.get("text"): - completion_content += event.get("text") - - if model_name and hasattr(instance, "count_tokens"): - completion_tokens = instance.count_tokens(completion_content) - - _set_token_usage( - span, - complete_response, - prompt_tokens, - completion_tokens, - metric_attributes, - token_histogram, - choice_counter, + # This mirrors the logic from abuild_from_streaming_response + metric_attributes = shared_metrics_attributes(self._complete_response) + set_span_attribute(self._span, GEN_AI_RESPONSE_ID, self._complete_response.get("id")) + + if self._duration_histogram: + duration = time.time() - self._start_time + self._duration_histogram.record( + duration, + attributes=metric_attributes, ) - except Exception as e: - logger.warning("Failed to set token usage, error: %s", str(e)) - _handle_streaming_response(span, event_logger, complete_response) + # Calculate token usage + if Config.enrich_token_usage: + try: + if usage := self._complete_response.get("usage"): + prompt_tokens = usage.get("input_tokens", 0) + else: + prompt_tokens = count_prompt_tokens_from_request(self._instance, self._kwargs) + + if usage := self._complete_response.get("usage"): + completion_tokens = usage.get("output_tokens", 0) + else: + completion_content = "" + if self._complete_response.get("events"): + model_name = self._complete_response.get("model") or None + for event in self._complete_response.get("events"): + if event.get("text"): + completion_content += event.get("text") + + if model_name and hasattr(self._instance, "count_tokens"): + completion_tokens = self._instance.count_tokens(completion_content) + + _set_token_usage( + self._span, + self._complete_response, + prompt_tokens, + completion_tokens, + metric_attributes, + self._token_histogram, + self._choice_counter, + ) + except Exception as e: + logger.warning("Failed to set token usage, error: %s", str(e)) + + _handle_streaming_response(self._span, self._event_logger, self._complete_response) + + if self._span.is_recording(): + self._span.set_status(Status(StatusCode.OK)) + self._span.end() - if span.is_recording(): - span.set_status(Status(StatusCode.OK)) - span.end() + self._instrumentation_completed = True class WrappedMessageStreamManager: @@ -328,8 +481,8 @@ def __init__( def __enter__(self): # Call the original stream manager's __enter__ to get the actual stream stream = self._stream_manager.__enter__() - # Return the wrapped stream - return build_from_streaming_response( + # Return the proxy that preserves helper methods + return AnthropicStream( self._span, stream, self._instance, @@ -376,8 +529,8 @@ def __init__( async def __aenter__(self): # Call the original stream manager's __aenter__ to get the actual stream stream = await self._stream_manager.__aenter__() - # Return the wrapped stream - return abuild_from_streaming_response( + # Return the proxy that preserves helper methods + return AnthropicAsyncStream( self._span, stream, self._instance, diff --git a/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml b/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml new file mode 100644 index 0000000000..88043a0f4e --- /dev/null +++ b/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml @@ -0,0 +1,152 @@ +interactions: +- request: + body: '{"max_tokens": 1024, "messages": [{"role": "user", "content": "Say hello + there!"}], "model": "claude-3-5-haiku-20241022", "stream": true}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + anthropic-version: + - '2023-06-01' + connection: + - keep-alive + content-length: + - '137' + content-type: + - application/json + host: + - api.anthropic.com + user-agent: + - AsyncAnthropic/Python 0.49.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - async:asyncio + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 0.49.0 + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.7 + x-stainless-stream-helper: + - messages + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: 'event: message_start + + data: {"type":"message_start","message":{"id":"msg_015vYx5y1ygzx5WM3FSMKpqQ","type":"message","role":"assistant","model":"claude-3-5-haiku-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}} } + + + event: content_block_start + + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"} } + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" + there"} } + + + event: ping + + data: {"type": "ping"} + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"!"} + } + + + event: content_block_stop + + data: {"type":"content_block_stop","index":0 } + + + event: message_delta + + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":6} } + + + event: message_stop + + data: {"type":"message_stop" } + + + ' + headers: + CF-RAY: + - 97e736f869475b61-VIE + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sat, 13 Sep 2025 11:13:21 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-organization-id: + - 617d109c-a187-4902-889d-689223d134aa + anthropic-ratelimit-input-tokens-limit: + - '400000' + anthropic-ratelimit-input-tokens-remaining: + - '400000' + anthropic-ratelimit-input-tokens-reset: + - '2025-09-13T11:13:20Z' + anthropic-ratelimit-output-tokens-limit: + - '80000' + anthropic-ratelimit-output-tokens-remaining: + - '80000' + anthropic-ratelimit-output-tokens-reset: + - '2025-09-13T11:13:20Z' + anthropic-ratelimit-requests-limit: + - '4000' + anthropic-ratelimit-requests-remaining: + - '3999' + anthropic-ratelimit-requests-reset: + - '2025-09-13T11:13:20Z' + anthropic-ratelimit-tokens-limit: + - '480000' + anthropic-ratelimit-tokens-remaining: + - '480000' + anthropic-ratelimit-tokens-reset: + - '2025-09-13T11:13:20Z' + cf-cache-status: + - DYNAMIC + request-id: + - req_011CT6JRYAtJjioFin4pYtvp + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + via: + - 1.1 google + x-envoy-upstream-service-time: + - '388' + status: + code: 200 + message: OK +version: 1 diff --git a/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml b/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml new file mode 100644 index 0000000000..8a625bbe4a --- /dev/null +++ b/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml @@ -0,0 +1,141 @@ +interactions: +- request: + body: '{"max_tokens": 1024, "messages": [{"role": "user", "content": "Say hello + there!"}], "model": "claude-3-5-haiku-20241022", "stream": true}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + anthropic-version: + - '2023-06-01' + connection: + - keep-alive + content-length: + - '137' + content-type: + - application/json + host: + - api.anthropic.com + user-agent: + - Anthropic/Python 0.49.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 0.49.0 + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.7 + x-stainless-stream-helper: + - messages + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: 'event: message_start + + data: {"type":"message_start","message":{"id":"msg_01MqperduQ5tQ7EVZSDJemPr","type":"message","role":"assistant","model":"claude-3-5-haiku-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}} } + + + event: content_block_start + + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"} } + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" + there!"} } + + + event: content_block_stop + + data: {"type":"content_block_stop","index":0 } + + + event: message_delta + + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":6} } + + + event: message_stop + + data: {"type":"message_stop" } + + + ' + headers: + CF-RAY: + - 97e736fe6a895aeb-VIE + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sat, 13 Sep 2025 11:13:22 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-organization-id: + - 617d109c-a187-4902-889d-689223d134aa + anthropic-ratelimit-input-tokens-limit: + - '400000' + anthropic-ratelimit-input-tokens-remaining: + - '400000' + anthropic-ratelimit-input-tokens-reset: + - '2025-09-13T11:13:21Z' + anthropic-ratelimit-output-tokens-limit: + - '80000' + anthropic-ratelimit-output-tokens-remaining: + - '80000' + anthropic-ratelimit-output-tokens-reset: + - '2025-09-13T11:13:21Z' + anthropic-ratelimit-requests-limit: + - '4000' + anthropic-ratelimit-requests-remaining: + - '3999' + anthropic-ratelimit-requests-reset: + - '2025-09-13T11:13:21Z' + anthropic-ratelimit-tokens-limit: + - '480000' + anthropic-ratelimit-tokens-remaining: + - '480000' + anthropic-ratelimit-tokens-reset: + - '2025-09-13T11:13:21Z' + cf-cache-status: + - DYNAMIC + request-id: + - req_011CT6JRcKB2ZPLaPAXKZ4Wa + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + via: + - 1.1 google + x-envoy-upstream-service-time: + - '573' + status: + code: 200 + message: OK +version: 1 diff --git a/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml b/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml new file mode 100644 index 0000000000..b3b40b89ba --- /dev/null +++ b/packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml @@ -0,0 +1,141 @@ +interactions: +- request: + body: '{"max_tokens": 1024, "messages": [{"role": "user", "content": "Say hello + there!"}], "model": "claude-3-5-haiku-20241022", "stream": true}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + anthropic-version: + - '2023-06-01' + connection: + - keep-alive + content-length: + - '137' + content-type: + - application/json + host: + - api.anthropic.com + user-agent: + - AsyncAnthropic/Python 0.49.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - async:asyncio + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 0.49.0 + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.7 + x-stainless-stream-helper: + - messages + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: 'event: message_start + + data: {"type":"message_start","message":{"id":"msg_015gfMD3dPMTLGst8hsmvWJw","type":"message","role":"assistant","model":"claude-3-5-haiku-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":2,"service_tier":"standard"}} } + + + event: content_block_start + + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello + there"} } + + + event: content_block_delta + + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"!"} } + + + event: content_block_stop + + data: {"type":"content_block_stop","index":0 } + + + event: message_delta + + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":6} } + + + event: message_stop + + data: {"type":"message_stop" } + + + ' + headers: + CF-RAY: + - 97e73f7559cb27b8-VIE + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sat, 13 Sep 2025 11:19:08 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-organization-id: + - 617d109c-a187-4902-889d-689223d134aa + anthropic-ratelimit-input-tokens-limit: + - '400000' + anthropic-ratelimit-input-tokens-remaining: + - '400000' + anthropic-ratelimit-input-tokens-reset: + - '2025-09-13T11:19:08Z' + anthropic-ratelimit-output-tokens-limit: + - '80000' + anthropic-ratelimit-output-tokens-remaining: + - '80000' + anthropic-ratelimit-output-tokens-reset: + - '2025-09-13T11:19:08Z' + anthropic-ratelimit-requests-limit: + - '4000' + anthropic-ratelimit-requests-remaining: + - '3999' + anthropic-ratelimit-requests-reset: + - '2025-09-13T11:19:08Z' + anthropic-ratelimit-tokens-limit: + - '480000' + anthropic-ratelimit-tokens-remaining: + - '480000' + anthropic-ratelimit-tokens-reset: + - '2025-09-13T11:19:08Z' + cf-cache-status: + - DYNAMIC + request-id: + - req_011CT6JsAYbhj2squ2Q8fn85 + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + via: + - 1.1 google + x-envoy-upstream-service-time: + - '409' + status: + code: 200 + message: OK +version: 1 diff --git a/packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py b/packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py index 29ff18c355..833308d5d7 100644 --- a/packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py +++ b/packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py @@ -2840,3 +2840,87 @@ async def test_async_anthropic_message_stream_manager_with_events_with_no_conten "message": {}, } assert_message_in_logs(logs[1], "gen_ai.choice", choice_event) + + +@pytest.mark.vcr() +@pytest.mark.asyncio +async def test_anthropic_streaming_helper_methods_legacy( + instrument_legacy, async_anthropic_client, span_exporter, log_exporter, reader +): + """Test that streaming helper methods like get_final_message() work with instrumentation""" + # Test async stream with get_final_message + async with async_anthropic_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + # Test that get_final_message() actually works (this is the main fix verification) + message = await stream.get_final_message() + assert message is not None + assert hasattr(message, 'content') + assert len(message.content) > 0 + # Test that the stream still has other helper methods available + assert hasattr(stream, 'text_stream') + assert hasattr(stream, 'until_done') + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1, f"Expected 1 span, got {len(spans)}" + assert spans[0].name == "anthropic.chat" + + +@pytest.mark.vcr() +@pytest.mark.asyncio +async def test_anthropic_text_stream_helper_method_legacy( + instrument_legacy, async_anthropic_client, span_exporter +): + """Test that text_stream() helper method works with instrumentation""" + async with async_anthropic_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + # Test that text_stream() works + text_content = "" + async for text in stream.text_stream: + text_content += text + assert len(text_content) > 0 + spans = span_exporter.get_finished_spans() + print(f"Number of spans created: {len(spans)}") + assert len(spans) == 1, f"Expected 1 span, got {len(spans)}" + assert spans[0].name == "anthropic.chat" + + +@pytest.mark.vcr() +def test_anthropic_sync_streaming_helper_methods_legacy( + instrument_legacy, anthropic_client, span_exporter +): + """Test that sync streaming helper methods work with instrumentation""" + # Test sync stream - this should work similarly without helper methods causing issues + with anthropic_client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + # Collect all events + events = [] + for event in stream: + events.append(event) + assert len(events) > 0 + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "anthropic.chat"