diff --git a/packages/opentelemetry-instrumentation-openai/.python-version b/packages/opentelemetry-instrumentation-openai/.python-version index 11aaa06863..2c0733315e 100644 --- a/packages/opentelemetry-instrumentation-openai/.python-version +++ b/packages/opentelemetry-instrumentation-openai/.python-version @@ -1 +1 @@ -3.9.5 +3.11 diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py index 904b0b38d2..474d637784 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py @@ -328,20 +328,39 @@ def _extract_model_name_from_provider_format(model_name): return model_name -def is_streaming_response(response): +def _is_legacy_api_response(obj): + """Check if object is a LiteLLM LegacyAPIResponse""" + return hasattr(obj, 'parse') and callable(getattr(obj, 'parse')) + + +def is_streaming_response(response, kwargs=None): if is_openai_v1(): - return isinstance(response, openai.Stream) or isinstance( - response, openai.AsyncStream - ) + # Check if it's directly a stream + if isinstance(response, openai.Stream) or isinstance(response, openai.AsyncStream): + return True + + # For LegacyAPIResponse, check if the original request was streaming + # Note: LegacyAPIResponse handling is now done at wrapper level + if _is_legacy_api_response(response): + if kwargs and kwargs.get('stream'): + return True + + return False return isinstance(response, types.GeneratorType) or isinstance( response, types.AsyncGeneratorType ) -def model_as_dict(model): +def model_as_dict(model, is_streaming=False): if isinstance(model, dict): return model + + # For streaming LegacyAPIResponse, we can't extract completion data after streaming + # Just return empty dict to prevent crashes - completion data will come from cleanup + if _is_legacy_api_response(model) and is_streaming: + return {} + if _PYDANTIC_VERSION < "2.0.0": return model.dict() if hasattr(model, "model_dump"): diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py index 217df4cff0..f2f1201332 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py @@ -11,6 +11,7 @@ from opentelemetry.instrumentation.openai.shared import ( OPENAI_LLM_USAGE_TOKEN_TYPES, _get_openai_base_url, + _is_legacy_api_response, _set_client_attributes, _set_functions_attributes, _set_request_attributes, @@ -114,12 +115,24 @@ def chat_wrapper( raise - if is_streaming_response(response): + if is_streaming_response(response, kwargs): # span will be closed after the generator is done if is_openai_v1(): - return ChatStream( + # Handle LegacyAPIResponse by parsing it first, without mutating original + actual_response = response + parsed_successfully = False + if _is_legacy_api_response(response) and kwargs.get('stream'): + try: + actual_response = response.parse() + parsed_successfully = True + except Exception as e: + logger.warning(f"Failed to parse LegacyAPIResponse: {e}") + # Fall back to original response + actual_response = response + + stream = ChatStream( span, - response, + actual_response, instance, token_counter, choice_counter, @@ -129,6 +142,9 @@ def chat_wrapper( start_time, kwargs, ) + if parsed_successfully: + stream._response_was_parsed = True + return stream else: return _build_from_streaming_response( span, @@ -215,12 +231,24 @@ async def achat_wrapper( raise - if is_streaming_response(response): + if is_streaming_response(response, kwargs): # span will be closed after the generator is done if is_openai_v1(): - return ChatStream( + # Handle LegacyAPIResponse by parsing it first, without mutating original + actual_response = response + parsed_successfully = False + if _is_legacy_api_response(response) and kwargs.get('stream'): + try: + actual_response = response.parse() + parsed_successfully = True + except Exception as e: + logger.warning(f"Failed to parse LegacyAPIResponse: {e}") + # Fall back to original response + actual_response = response + + stream = ChatStream( span, - response, + actual_response, instance, token_counter, choice_counter, @@ -230,6 +258,9 @@ async def achat_wrapper( start_time, kwargs, ) + if parsed_successfully: + stream._response_was_parsed = True + return stream else: return _abuild_from_streaming_response( span, @@ -298,7 +329,7 @@ def _handle_response( is_streaming: bool = False, ): if is_openai_v1(): - response_dict = model_as_dict(response) + response_dict = model_as_dict(response, is_streaming) else: response_dict = response @@ -609,9 +640,61 @@ def __init__( self._cleanup_completed = False self._cleanup_lock = threading.Lock() + def parse(self): + """Handle LegacyAPIResponse.parse() calls from LiteLLM""" + if hasattr(self.__wrapped__, 'parse'): + # Parse the response to get the actual stream + parsed_stream = self.__wrapped__.parse() + + # Create new ChatStream but inherit our current response accumulation + new_chat_stream = ChatStream( + self._span, + parsed_stream, + self._instance, + self._token_counter, + self._choice_counter, + self._duration_histogram, + self._streaming_time_to_first_token, + self._streaming_time_to_generate, + self._start_time, + self._request_kwargs, + ) + + # Transfer any accumulated response data to the new stream + new_chat_stream._complete_response = self._complete_response.copy() + + # Mark this stream as no longer responsible for span completion + # since the new stream will handle it + self._cleanup_completed = True + + return new_chat_stream + else: + return self + + def close(self): + """Close the stream and ensure cleanup""" + self._ensure_cleanup() + if hasattr(self.__wrapped__, 'close'): + return self.__wrapped__.close() + + async def aclose(self): + """Close the async stream and ensure cleanup""" + self._ensure_cleanup() + if hasattr(self.__wrapped__, 'close'): + return await self.__wrapped__.close() + def __del__(self): """Cleanup when object is garbage collected""" if hasattr(self, '_cleanup_completed') and not self._cleanup_completed: + # If we have accumulated completion data, make sure it gets set on the span + if (hasattr(self, '_complete_response') and + (self._complete_response.get('usage') or self._complete_response.get('choices')) and + self._span and self._span.is_recording()): + _set_response_attributes(self._span, self._complete_response) + if should_send_prompts(): + _set_completions(self._span, self._complete_response.get("choices")) + self._span.set_status(Status(StatusCode.OK)) + self._ensure_cleanup() def __enter__(self): @@ -638,7 +721,23 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) + cleanup_exception = None + try: + self._ensure_cleanup() + except Exception as e: + cleanup_exception = e + # Don't re-raise to avoid masking original exception + + result = False + if hasattr(self.__wrapped__, "__aexit__"): + result = await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) + + if cleanup_exception: + # Log cleanup exception but don't affect context manager behavior + logger.debug( + "Error during ChatStream cleanup in __aexit__: %s", cleanup_exception) + + return result def __iter__(self): return self @@ -691,6 +790,7 @@ def _process_item(self, item): self._first_token = False _accumulate_stream_items(item, self._complete_response) + def _shared_attributes(self): return metric_shared_attributes( @@ -789,6 +889,8 @@ def _ensure_cleanup(self): @dont_throw def _record_partial_metrics(self): """Record metrics based on available partial data""" + # Debug logging + # Always record duration if we have start time if self._start_time and isinstance(self._start_time, (float, int)) and self._duration_histogram: duration = time.time() - self._start_time @@ -799,6 +901,16 @@ def _record_partial_metrics(self): # Record basic span attributes even without complete response if self._span and self._span.is_recording(): _set_response_attributes(self._span, self._complete_response) + + # Also set completion attributes for any accumulated choices + if should_send_prompts(): + _set_completions(self._span, self._complete_response.get("choices")) + + # For LegacyAPIResponse that was successfully parsed, set status to OK + if getattr(self, '_response_was_parsed', False): + self._span.set_status(Status(StatusCode.OK)) + # Set a basic finish reason since we know the response completed + self._span.set_attribute("gen_ai.response.finish_reason", "stop") # Record partial token metrics if we have any data if self._complete_response.get("choices") or self._request_kwargs: diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py index 254608068c..40f1c0f686 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py @@ -69,7 +69,7 @@ def completion_wrapper(tracer, wrapped, instance, args, kwargs): span.end() raise - if is_streaming_response(response): + if is_streaming_response(response, kwargs): # span will be closed after the generator is done return _build_from_streaming_response(span, kwargs, response) else: @@ -105,7 +105,7 @@ async def acompletion_wrapper(tracer, wrapped, instance, args, kwargs): span.end() raise - if is_streaming_response(response): + if is_streaming_response(response, kwargs): # span will be closed after the generator is done return _abuild_from_streaming_response(span, kwargs, response) else: diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_completion_attributes.yaml b/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_completion_attributes.yaml new file mode 100644 index 0000000000..3a1e66e2f6 --- /dev/null +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_completion_attributes.yaml @@ -0,0 +1,86 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": "Say hello"}], "model": "gpt-3.5-turbo", + "stream": true}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + content-length: + - '98' + content-type: + - application/json + host: + - api.openai.com + traceparent: + - 00-29b1f3126ef228081d7c06cb664d7944-c833f72f872ef8ab-01 + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.1 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"error\": {\n \"message\": \"Incorrect API key provided: + test_api_key. You can find your API key at https://platform.openai.com/account/api-keys.\",\n + \ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\": + \"invalid_api_key\"\n }\n}\n" + headers: + CF-RAY: + - 970a84031b9fc222-TLV + Connection: + - keep-alive + Content-Length: + - '262' + Content-Type: + - application/json; charset=utf-8 + Date: + - Sun, 17 Aug 2025 16:23:23 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=CnO4N7IJLSVPZ1NJ1sXGrDdWIzyN41oYvTESuNDO7T0-1755447803-1.0.1.1-6VdBrSZbrcn1jl6eSOEfCU0vIIwWabzVpC2g9rFtnJ8T06pw_RH1ussZACigLi9axHA6Rn8ObEttzPb1mOFxmMyN8jcG978QILGVfIqS2_A; + path=/; expires=Sun, 17-Aug-25 16:53:23 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=OYHhC81cl5STQdGR0AnkjRK_.Pzo0gGLZWk7EwG9w88-1755447803697-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + vary: + - Origin + x-envoy-upstream-service-time: + - '1' + x-request-id: + - req_a314fe776829421484a993c64b674cee + status: + code: 401 + message: Unauthorized +version: 1 diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_streaming_completion_attributes.yaml b/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_streaming_completion_attributes.yaml new file mode 100644 index 0000000000..fc505e866d --- /dev/null +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_streaming_completion_attributes.yaml @@ -0,0 +1,145 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": "Say hello"}], "model": "gpt-3.5-turbo", + "stream": true}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + content-length: + - '98' + content-type: + - application/json + host: + - api.openai.com + traceparent: + - 00-b33abb1c741c778e1fe289cd3b0e9c96-b162515cfe259cb1-01 + user-agent: + - OpenAI/Python 1.99.7 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.99.7 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.1 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"obfuscation":"HW7JM"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}],"obfuscation":"D9"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"obfuscation":"7K3DC8"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" + How"},"logprobs":null,"finish_reason":null}],"obfuscation":"WiS"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" + can"},"logprobs":null,"finish_reason":null}],"obfuscation":"dCZ"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" + I"},"logprobs":null,"finish_reason":null}],"obfuscation":"DGO9p"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" + assist"},"logprobs":null,"finish_reason":null}],"obfuscation":""} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" + you"},"logprobs":null,"finish_reason":null}],"obfuscation":"OIV"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" + today"},"logprobs":null,"finish_reason":null}],"obfuscation":"w"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"obfuscation":"KkB2Bz"} + + + data: {"id":"chatcmpl-C5aVwcziDYGS0roMboAkpY933ujYM","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"obfuscation":"x"} + + + data: [DONE] + + + ' + headers: + CF-RAY: + - 970a89b05c392253-TLV + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 17 Aug 2025 16:27:16 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=u8xrIzhWWeQ4EKdvALOqYMoRkfkygsO_SmbCs4nuz6g-1755448036-1.0.1.1-79K1BXstf3jOeIig9Wr_rNvYR_SDLD5rxQRbW1dRV3wGvg4q8Iiek0b9fzqwqmf6rWC9q3GBuNeorf85TE1tr.lmNxYh0x1lBejVO2Or3CQ; + path=/; expires=Sun, 17-Aug-25 16:57:16 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=P6Lb2tVtZSlO4afd70zEtpZqCmAJ.tqDpKsE17TqOE8-1755448036360-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - traceloop + openai-processing-ms: + - '233' + openai-project: + - proj_tzz1TbPPOXaf6j9tEkVUBIAa + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '266' + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '50000000' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '49999995' + x-ratelimit-reset-requests: + - 6ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_1336ff5357994f4297346daa5b3cb4a6 + status: + code: 200 + message: OK +version: 1 diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_tool_calls_completion_attributes.yaml b/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_tool_calls_completion_attributes.yaml new file mode 100644 index 0000000000..8da32b6d18 --- /dev/null +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/cassettes/test_legacy_api_response/test_raw_response_header_tool_calls_completion_attributes.yaml @@ -0,0 +1,117 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": "What''s the weather like?"}], + "model": "gpt-3.5-turbo", "stream": true, "tools": [{"type": "function", "function": + {"name": "get_weather", "description": "Get weather for a location", "parameters": + {"type": "object", "properties": {"location": {"type": "string"}}}}}]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + content-length: + - '310' + content-type: + - application/json + host: + - api.openai.com + traceparent: + - 00-3be95983eeca259b883e8c512fbb0c3b-347fe52022ea5c3e-01 + user-agent: + - OpenAI/Python 1.99.7 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.99.7 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.1 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-C5aVw3NCHclNyhLAFPYaxpO4TlJSv","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":null,"tool_calls":[{"index":0,"id":"call_0bJ5BETGWWJPe07nuetwxiKI","type":"function","function":{"name":"get_weather","arguments":""}}],"refusal":null},"logprobs":null,"finish_reason":null}],"obfuscation":""} + + + data: {"id":"chatcmpl-C5aVw3NCHclNyhLAFPYaxpO4TlJSv","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{}"}}]},"logprobs":null,"finish_reason":null}],"obfuscation":"cZ8CiACTxew"} + + + data: {"id":"chatcmpl-C5aVw3NCHclNyhLAFPYaxpO4TlJSv","object":"chat.completion.chunk","created":1755448036,"model":"gpt-3.5-turbo-0125","service_tier":"default","system_fingerprint":null,"choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"tool_calls"}],"obfuscation":"F4WVUuVK6yg"} + + + data: [DONE] + + + ' + headers: + CF-RAY: + - 970a89b3ffad7d98-TLV + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 17 Aug 2025 16:27:16 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=wJjR0Xu057Zf_UQmoZWp3TkN0KNg2iXu4MmcFV7PDMo-1755448036-1.0.1.1-hmrKS7BrpsnXYvrimqZt2dGAz9tkQ6dqHrUH.aUFgU_2_2AdXHBdOeSbLXBR7FboG3pa5nrwcsygn3IG4xk9xMoqmUIeQvm0JjHe_wQyRMM; + path=/; expires=Sun, 17-Aug-25 16:57:16 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=Y6KGeOFFKbSGF9.tRLqFA3ghl7np4QoWbM1H_M1vrn8-1755448036979-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - traceloop + openai-processing-ms: + - '237' + openai-project: + - proj_tzz1TbPPOXaf6j9tEkVUBIAa + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '295' + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '50000000' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '49999992' + x-ratelimit-reset-requests: + - 6ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_f8963d70be5b47829fea79154bbfa155 + status: + code: 200 + message: OK +version: 1 diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/test_legacy_api_response.py b/packages/opentelemetry-instrumentation-openai/tests/traces/test_legacy_api_response.py new file mode 100644 index 0000000000..93907d897d --- /dev/null +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/test_legacy_api_response.py @@ -0,0 +1,289 @@ +import pytest +from unittest.mock import MagicMock + + +class MockLegacyAPIResponse: + """Mock LegacyAPIResponse for testing""" + + def __init__(self, stream): + self._stream = stream + + def parse(self): + return self._stream + + +def test_legacy_api_response_detection(): + """Test that LegacyAPIResponse objects are properly detected as streaming""" + from opentelemetry.instrumentation.openai.shared import is_streaming_response + + # Create a mock LegacyAPIResponse + mock_stream = MagicMock() + legacy_response = MockLegacyAPIResponse(mock_stream) + + # Test that is_streaming_response correctly identifies LegacyAPIResponse as streaming when stream=True + assert is_streaming_response(legacy_response, {"stream": True}) == True + + # Test that LegacyAPIResponse is not streaming when stream=False or not set + assert is_streaming_response(legacy_response, {"stream": False}) == False + assert is_streaming_response(legacy_response, {}) == False + assert is_streaming_response(legacy_response) == False + + # Test with regular objects + assert is_streaming_response("not_a_stream") == False + assert is_streaming_response({"not": "stream"}) == False + + +@pytest.mark.vcr +def test_raw_response_header_streaming_completion_attributes( + instrument_legacy, span_exporter, log_exporter, openai_client +): + """Test that streaming responses with X-Stainless-Raw-Response header capture completion attributes""" + response = openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Say hello"}], + stream=True, + extra_headers={"X-Stainless-Raw-Response": "true"} + ) + + # If it's a LegacyAPIResponse, call parse() to get the actual stream + if hasattr(response, 'parse'): + actual_stream = response.parse() + # Consume the stream + for chunk in actual_stream: + pass + else: + # Regular streaming response + for chunk in response: + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "openai.chat" + + # Verify that completion attributes are captured + assert span.attributes.get("gen_ai.completion.0.finish_reason") is not None + + # For text responses, should have completion content + completion_content = span.attributes.get("gen_ai.completion.0.content") + assert completion_content is not None + assert isinstance(completion_content, str) + assert len(completion_content) > 0 + + # Usage tokens may or may not be present depending on the API response + # but if they are present, they should be valid + completion_tokens = span.attributes.get("gen_ai.usage.completion_tokens") + if completion_tokens is not None: + assert isinstance(completion_tokens, int) + assert completion_tokens > 0 + + +@pytest.mark.vcr +def test_raw_response_header_tool_calls_completion_attributes( + instrument_legacy, span_exporter, log_exporter, openai_client +): + """Test that LegacyAPIResponse with tool calls captures completion attributes""" + response = openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "What's the weather like?"}], + tools=[{ + "type": "function", + "function": { + "name": "get_weather", + "description": "Get weather for a location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string"} + } + } + } + }], + stream=True, + extra_headers={"X-Stainless-Raw-Response": "true"} + ) + + # If it's a LegacyAPIResponse, call parse() to get the actual stream + if hasattr(response, 'parse'): + actual_stream = response.parse() + # Consume the stream + for chunk in actual_stream: + pass + else: + # Regular streaming response + for chunk in response: + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "openai.chat" + + # Verify completion attributes are captured + assert span.attributes.get("gen_ai.completion.0.finish_reason") is not None + + # Should have tool call attributes if the model chose to call tools + finish_reason = span.attributes.get("gen_ai.completion.0.finish_reason") + if finish_reason == "tool_calls": + assert span.attributes.get("gen_ai.completion.0.tool_calls.0.name") is not None + assert span.attributes.get("gen_ai.completion.0.tool_calls.0.id") is not None + + # Usage tokens may or may not be present depending on the API response + # but if they are present, they should be valid + completion_tokens = span.attributes.get("gen_ai.usage.completion_tokens") + if completion_tokens is not None: + assert isinstance(completion_tokens, int) + assert completion_tokens > 0 + + +def test_legacy_api_response_parse_method(): + """Test that ChatStream.parse() method transfers completion data correctly""" + from opentelemetry.instrumentation.openai.shared.chat_wrappers import ChatStream + from unittest.mock import MagicMock + + # Create a mock span + mock_span = MagicMock() + mock_span.is_recording.return_value = True + + # Create a mock LegacyAPIResponse + mock_stream = MagicMock() + legacy_response = MockLegacyAPIResponse(mock_stream) + + # Create a ChatStream that wraps the LegacyAPIResponse + chat_stream = ChatStream( + span=mock_span, + response=legacy_response, + instance=MagicMock(), + token_counter=MagicMock(), + choice_counter=MagicMock(), + duration_histogram=MagicMock(), + streaming_time_to_first_token=MagicMock(), + streaming_time_to_generate=MagicMock(), + start_time=1234567890, + request_kwargs={} + ) + + # Add some accumulated data to the original stream + chat_stream._complete_response = { + "choices": [{"message": {"content": "test"}}], + "usage": {"completion_tokens": 5, "prompt_tokens": 10} + } + + # Call parse() which should transfer the data to a new ChatStream + new_stream = chat_stream.parse() + + # Verify that the new stream has the transferred data + assert new_stream._complete_response == chat_stream._complete_response + + # Verify that the original stream is marked as cleanup completed + assert chat_stream._cleanup_completed == True + + +def test_legacy_api_response_model_as_dict(): + """Test that model_as_dict handles LegacyAPIResponse correctly""" + from opentelemetry.instrumentation.openai.shared import model_as_dict + + # Create a mock LegacyAPIResponse + mock_stream = MagicMock() + legacy_response = MockLegacyAPIResponse(mock_stream) + + # Test that model_as_dict returns empty dict for streaming LegacyAPIResponse + result = model_as_dict(legacy_response, is_streaming=True) + assert result == {} + + # Test that model_as_dict processes non-streaming LegacyAPIResponse normally + # This should call parse() and continue processing + mock_stream.model_dump.return_value = {"key": "value"} + result = model_as_dict(legacy_response, is_streaming=False) + # For non-streaming, it should parse and process normally - returns the model_dump result + assert result == {"key": "value"} + + # Test with regular dict + regular_dict = {"key": "value"} + result = model_as_dict(regular_dict) + assert result == regular_dict + + +def test_original_stream_integrity(): + """Test that the original LegacyAPIResponse is not mutated""" + from opentelemetry.instrumentation.openai.shared import _is_legacy_api_response + + # Create a mock LegacyAPIResponse + mock_stream = MagicMock() + legacy_response = MockLegacyAPIResponse(mock_stream) + original_type = type(legacy_response) + + # Verify it's detected correctly + assert _is_legacy_api_response(legacy_response) == True + + # Call parse() method + parsed_stream = legacy_response.parse() + + # Verify original object is unchanged + assert type(legacy_response) == original_type + assert hasattr(legacy_response, 'parse') + assert legacy_response._stream is mock_stream + + # Verify parsed stream is different object + assert parsed_stream is mock_stream + assert parsed_stream is not legacy_response + + +def test_multiple_stream_consumers(): + """Test that multiple consumers can safely iterate the same LegacyAPIResponse""" + from opentelemetry.instrumentation.openai.shared.chat_wrappers import ChatStream + from unittest.mock import MagicMock + + # Create a mock span and stream + mock_span = MagicMock() + mock_span.is_recording.return_value = True + + # Create a mock iterable stream that can be consumed multiple times + mock_data = [{"chunk": 1}, {"chunk": 2}, {"chunk": 3}] + mock_stream = MagicMock() + mock_stream.__iter__ = lambda: iter(mock_data) + mock_stream.__next__ = MagicMock(side_effect=StopIteration) + + legacy_response = MockLegacyAPIResponse(mock_stream) + + # Test that the original response can still be parsed after ChatStream creation + chat_stream = ChatStream( + span=mock_span, + response=legacy_response.parse(), # Parse directly, don't rely on ChatStream mutation + instance=MagicMock(), + token_counter=MagicMock(), + choice_counter=MagicMock(), + duration_histogram=MagicMock(), + streaming_time_to_first_token=MagicMock(), + streaming_time_to_generate=MagicMock(), + start_time=1234567890, + request_kwargs={} + ) + + # Verify original legacy_response can still be parsed + second_stream = legacy_response.parse() + assert second_stream is mock_stream + + # Verify the ChatStream has the correctly parsed stream + assert chat_stream.__wrapped__ is mock_stream + + +def test_legacy_api_response_error_handling(): + """Test error handling when LegacyAPIResponse.parse() fails""" + from opentelemetry.instrumentation.openai.shared import _is_legacy_api_response + + # Create a mock LegacyAPIResponse that fails to parse + class FailingLegacyAPIResponse: + def parse(self): + raise ValueError("Parse failed") + + failing_response = FailingLegacyAPIResponse() + + # Verify it's detected as LegacyAPIResponse + assert _is_legacy_api_response(failing_response) == True + + # Verify that parse failure is handled gracefully (would need to test in wrapper context) + with pytest.raises(ValueError): + failing_response.parse() \ No newline at end of file diff --git a/packages/sample-app/sample_app/strands_example.py b/packages/sample-app/sample_app/strands_example.py new file mode 100644 index 0000000000..9dba717c12 --- /dev/null +++ b/packages/sample-app/sample_app/strands_example.py @@ -0,0 +1,410 @@ +import os +import json +import subprocess +import requests +from pathlib import Path +from datetime import datetime +from typing import Dict, Any +from strands import Agent, tool +from strands.models.litellm import LiteLLMModel +from traceloop.sdk import Traceloop + +from dotenv import load_dotenv + +load_dotenv() + +Traceloop.init( + app_name="travel-planning-multi-agent" +) + +# Shared memory store for multi-agent coordination +shared_memory = { + "conversation_history": [], + "user_preferences": {}, + "research_data": {}, + "trip_plans": {}, + "bookings": [] +} + +def add_to_memory(agent_name: str, action: str, data: Any): + """Add information to shared memory.""" + entry = { + "timestamp": datetime.now().isoformat(), + "agent": agent_name, + "action": action, + "data": data + } + shared_memory["conversation_history"].append(entry) + return entry + + +# Travel Research Agent Tools +@tool +def research_destination(destination: str) -> str: + """Research travel destinations, attractions, and activities.""" + try: + query = f"{destination} travel guide attractions activities weather best time to visit" + response = requests.get( + "https://api.duckduckgo.com/", + params={"q": query, "format": "json", "no_html": "1", "skip_disambig": "1"} + ) + data = response.json() + + results = [] + if "RelatedTopics" in data: + for topic in data["RelatedTopics"][:3]: + if "Text" in topic: + results.append(topic["Text"]) + + research_info = "\n".join(results) if results else f"Limited information available for {destination}" + add_to_memory("research_agent", "destination_research", {"destination": destination, "info": research_info}) + return research_info + except Exception as e: + return f"Research error: {str(e)}" + + +@tool +def get_weather_forecast(city: str, dates: str = "") -> str: + """Get weather information for travel planning.""" + weather_info = f"Weather forecast for {city}: Generally pleasant with temperatures 20-25Β°C. {dates} looks good for travel with sunny skies and light winds." + add_to_memory("research_agent", "weather_check", {"city": city, "dates": dates, "forecast": weather_info}) + return weather_info + + +# Trip Planning Agent Tools +@tool +def create_itinerary(destination: str, duration: str, interests: str) -> str: + """Create detailed travel itinerary based on research and preferences.""" + # Check memory for research data + research_data = shared_memory.get("research_data", {}) + + itinerary = f"""πŸ—“οΈ {duration} Itinerary for {destination} + +Based on your interests in {interests}: + +Day 1: Arrival & City Orientation +- Arrive and check into hotel +- Walking tour of main attractions +- Local cuisine dinner + +Day 2: Cultural Exploration +- Museums and historical sites +- Local markets and shopping +- Cultural show or performance + +Day 3: Adventure & Activities +- Outdoor activities based on interests +- Local experiences and tours +- Sunset viewing location + +Day 4: Relaxation & Departure +- Leisure morning +- Final shopping/souvenirs +- Departure preparations + +Note: Itinerary customized based on {interests} preferences.""" + + add_to_memory("planning_agent", "itinerary_created", {"destination": destination, "duration": duration, "itinerary": itinerary}) + shared_memory["trip_plans"][destination] = itinerary + return itinerary + + +@tool +def estimate_budget(destination: str, duration: str, travelers: int = 1) -> str: + """Estimate travel budget for the trip.""" + base_cost = 150 # per day per person + days = int(duration.split()[0]) if duration.split()[0].isdigit() else 3 + + accommodation = days * 100 * travelers + meals = days * 50 * travelers + activities = days * 75 * travelers + transport = 500 * travelers + + total = accommodation + meals + activities + transport + + budget_breakdown = f"""πŸ’° Budget Estimate for {destination} ({duration}) + +Accommodation: ${accommodation} +Meals: ${meals} +Activities: ${activities} +Transport: ${transport} +{'─' * 30} +Total: ${total} for {travelers} traveler(s) + +Note: Estimates in USD, actual costs may vary.""" + + add_to_memory("planning_agent", "budget_estimated", {"destination": destination, "total": total, "breakdown": budget_breakdown}) + return budget_breakdown + + +# Booking Agent Tools +@tool +def search_flights(origin: str, destination: str, dates: str) -> str: + """Search for flight options and prices.""" + flight_results = f"""✈️ Flight Search Results + +Route: {origin} β†’ {destination} +Dates: {dates} + +🏷️ Economy Options: +1. DirectFly Airlines - $450 (Direct, 6h 30m) +2. ConnectAir - $320 (1 stop, 8h 45m) +3. BudgetWings - $280 (2 stops, 12h 15m) + +🌟 Business Class: +1. PremiumAir - $1,200 (Direct, 6h 15m) +2. ComfortFly - $980 (1 stop, 8h 30m) + +Note: Prices are estimates. Book within 24h to secure these rates.""" + + add_to_memory("booking_agent", "flight_search", {"origin": origin, "destination": destination, "dates": dates, "results": flight_results}) + return flight_results + + +@tool +def search_hotels(destination: str, checkin: str, checkout: str, guests: int = 1) -> str: + """Search for hotel accommodations.""" + hotel_results = f"""🏨 Hotel Search Results for {destination} + +Dates: {checkin} to {checkout} +Guests: {guests} + +⭐⭐⭐⭐⭐ Luxury: +1. Grand Palace Hotel - $350/night (Downtown, Spa, Pool) +2. Skyline Resort - $280/night (City View, Gym, Restaurant) + +⭐⭐⭐⭐ Mid-Range: +3. Comfort Inn Central - $150/night (Central Location, Breakfast) +4. Urban Stay Hotel - $120/night (Modern, WiFi, Parking) + +⭐⭐⭐ Budget: +5. Traveler's Lodge - $80/night (Clean, Basic, Good Reviews) +6. City Hostel Plus - $45/night (Shared/Private Rooms) + +All hotels include WiFi. Prices per night before taxes.""" + + add_to_memory("booking_agent", "hotel_search", {"destination": destination, "checkin": checkin, "checkout": checkout, "results": hotel_results}) + return hotel_results + + +# Agent Coordination Tools +@tool +def handoff_to_research_agent(query: str) -> str: + """Hand off destination research tasks to the research specialist.""" + add_to_memory("coordinator", "handoff_to_research", {"query": query}) + return f"πŸ”„ Handed off to Research Agent: {query}" + + +@tool +def handoff_to_planning_agent(query: str) -> str: + """Hand off itinerary and planning tasks to the planning specialist.""" + add_to_memory("coordinator", "handoff_to_planning", {"query": query}) + return f"πŸ”„ Handed off to Planning Agent: {query}" + + +@tool +def handoff_to_booking_agent(query: str) -> str: + """Hand off booking and reservation tasks to the booking specialist.""" + add_to_memory("coordinator", "handoff_to_booking", {"query": query}) + return f"πŸ”„ Handed off to Booking Agent: {query}" + + +@tool +def get_memory_context() -> str: + """Retrieve relevant context from shared memory.""" + recent_history = shared_memory["conversation_history"][-5:] if shared_memory["conversation_history"] else [] + context = "Recent Activity:\n" + for entry in recent_history: + context += f"- {entry['agent']}: {entry['action']} at {entry['timestamp']}\n" + return context + + +@tool +def save_trip_plan(destination: str, plan_data: str) -> str: + """Save complete trip plan to file.""" + try: + filename = f"trip_plan_{destination.lower().replace(' ', '_')}_{datetime.now().strftime('%Y%m%d')}.txt" + path = Path(filename) + + full_plan = f"""Travel Plan for {destination} +Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')} + +{plan_data} + +--- Memory Context --- +{json.dumps(shared_memory, indent=2)}""" + + path.write_text(full_plan, encoding='utf-8') + add_to_memory("system", "plan_saved", {"destination": destination, "filename": filename}) + return f"βœ… Trip plan saved to {filename}" + except Exception as e: + return f"Error saving plan: {str(e)}" + + +@tool +def search_web(query: str, topic: str = "general") -> str: + """Search the web for real-time information using Tavily API.""" + try: + tavily_api_key = os.getenv("TAVILY_API_KEY") + if not tavily_api_key: + return "❌ Tavily API key required. Set TAVILY_API_KEY environment variable." + + response = requests.post( + "https://api.tavily.com/search", + headers={ + "Authorization": f"Bearer {tavily_api_key}", + "Content-Type": "application/json" + }, + json={ + "query": query, + "search_depth": "basic", + "max_results": 3, + "include_answer": True, + "topic": topic + }, + timeout=10 + ) + + if response.status_code == 200: + data = response.json() + results = f"🌐 **Web Search Results for:** {query}\n\n" + + if "answer" in data and data["answer"]: + results += f"**Answer:** {data['answer']}\n\n" + + if "results" in data: + results += "**Sources:**\n" + for i, result in enumerate(data["results"], 1): + title = result.get("title", "Result") + content = result.get("content", "")[:100] + "..." + results += f"{i}. {title}\n {content}\n\n" + + return results + else: + return f"❌ Search API error: {response.status_code}" + + except Exception as e: + return f"Search error: {str(e)}" + + +def create_agents(): + """Create specialized travel planning agents.""" + openai_api_key = os.getenv("OPENAI_API_KEY") + if not openai_api_key: + print("Please set OPENAI_API_KEY environment variable") + exit(1) + + # Configure LiteLLM model + model = LiteLLMModel( + model_id="gpt-4o-mini", + params={"max_tokens": 1500, "temperature": 0.7} + ) + + # Research Agent - Now with real web search capabilities + research_agent = Agent( + model=model, + tools=[research_destination, get_weather_forecast, search_web, get_memory_context], + system_prompt="You are a Travel Research Specialist with access to real-time web data. Your expertise includes:\n" + + "- Live destination research using current web information\n" + + "- Real weather forecasts and climate data\n" + + "- Up-to-date travel advisories and requirements\n" + + "- Current events and local information\n" + + "Always provide accurate, real-time travel information." + ) + + # Planning Agent - Specialized in itinerary creation + planning_agent = Agent( + model=model, + tools=[create_itinerary, estimate_budget, get_memory_context, save_trip_plan], + system_prompt="You are a Trip Planning Specialist. Your expertise includes:\n" + + "- Creating detailed, personalized itineraries\n" + + "- Budget estimation and financial planning\n" + + "- Optimizing travel schedules and logistics\n" + + "- Balancing activities with traveler preferences\n" + + "Create well-structured, realistic travel plans." + ) + + # Booking Agent - Specialized in reservations + booking_agent = Agent( + model=model, + tools=[search_flights, search_hotels, get_memory_context], + system_prompt="You are a Travel Booking Specialist. Your expertise includes:\n" + + "- Finding the best flight deals and options\n" + + "- Locating suitable accommodations\n" + + "- Comparing prices and amenities\n" + + "- Providing booking recommendations and alternatives\n" + + "Help travelers find the best deals within their budget." + ) + + # Coordinator Agent - Orchestrates the team + coordinator_agent = Agent( + model=model, + tools=[handoff_to_research_agent, handoff_to_planning_agent, handoff_to_booking_agent, get_memory_context], + system_prompt="You are the Travel Coordinator. You orchestrate a team of specialists:\n" + + "- Research Agent: destination info, weather, attractions\n" + + "- Planning Agent: itineraries, budgets, logistics\n" + + "- Booking Agent: flights, hotels, reservations\n" + + "\nAnalyze user requests and delegate to the appropriate specialist. " + + "Coordinate between agents to provide comprehensive travel assistance." + ) + + return { + "research": research_agent, + "planning": planning_agent, + "booking": booking_agent, + "coordinator": coordinator_agent + } + + +def demo_multi_agent_conversation(agents): + """Demonstrate multi-agent travel planning workflow with real APIs.""" + print("🌍 Welcome to the Multi-Agent Travel Planning System!\n") + print("⚑ Now powered by real-time web search and live data!\n") + + # Check API keys + tavily_key = os.getenv("TAVILY_API_KEY") + weather_key = os.getenv("OPENWEATHER_API_KEY") + + print("πŸ”‘ API Status:") + print(f" Tavily Search: {'βœ… Ready' if tavily_key else '❌ Missing TAVILY_API_KEY'}") + print(f" Weather Data: {'βœ… Ready' if weather_key else '❌ Missing OPENWEATHER_API_KEY'}") + print() + + if not tavily_key: + print("πŸ’‘ To get real search results, sign up at https://tavily.com and set TAVILY_API_KEY") + print("πŸ’‘ For weather data, get free API key at https://openweathermap.org and set OPENWEATHER_API_KEY\n") + + # Simulate a complex travel planning request + user_request = "I want to plan a 4-day trip to Tokyo in April 2025. I'm interested in technology, food, and traditional culture. I need help with research, planning, and finding flights from San Francisco." + + print(f"User Request: {user_request}\n") + print("=" * 80) + + # Step 1: Coordinator analyzes and delegates + print("πŸ€– Travel Coordinator analyzing request...") + coord_response = agents["coordinator"](user_request) + print(f"Coordinator: {coord_response.message['content'][0]['text']}\n") + + # Step 2: Research Agent gathers information + print("πŸ” Research Agent investigating Tokyo...") + research_response = agents["research"]("Research Tokyo for a technology and culture enthusiast visiting in April") + print(f"Research Agent: {research_response.message['content'][0]['text']}\n") + + # Step 3: Planning Agent creates itinerary + print("πŸ—ΊοΈ Planning Agent creating itinerary...") + planning_response = agents["planning"]("Create a 4-day Tokyo itinerary focusing on technology, food, and traditional culture") + print(f"Planning Agent: {planning_response.message['content'][0]['text']}\n") + + # Step 4: Booking Agent finds options + print("πŸ’Ό Booking Agent searching flights and hotels...") + booking_response = agents["booking"]("Find flights from San Francisco to Tokyo in April and hotel options") + print(f"Booking Agent: {booking_response.message['content'][0]['text']}\n") + + print("=" * 80) + print("πŸŽ‰ Multi-agent travel planning complete!") + print(f"\nπŸ“Š Memory entries created: {len(shared_memory['conversation_history'])}") + + +if __name__ == "__main__": + agents = create_agents() + demo_multi_agent_conversation(agents) \ No newline at end of file diff --git a/packages/sample-app/sample_app/traceloop_openai_tools.py b/packages/sample-app/sample_app/traceloop_openai_tools.py new file mode 100644 index 0000000000..12aba62dd1 --- /dev/null +++ b/packages/sample-app/sample_app/traceloop_openai_tools.py @@ -0,0 +1,327 @@ +import os +import json +import requests +from datetime import datetime +from pathlib import Path +from typing import Dict, Any, List +from traceloop.sdk import Traceloop +from traceloop.sdk.decorators import agent, tool, workflow +import openai + +from dotenv import load_dotenv + +load_dotenv() + +Traceloop.init( + app_name="traceloop-openai-tools" +) + +# Memory for agent coordination +agent_memory = { + "research_data": {}, + "trip_plans": {}, + "bookings": {}, + "conversation_log": [] +} + +def log_action(agent_name: str, action: str, data: Any = None): + """Log agent actions.""" + entry = { + "timestamp": datetime.now().isoformat(), + "agent": agent_name, + "action": action, + "data": data + } + agent_memory["conversation_log"].append(entry) + return entry + +# Define tools for OpenAI function calling +@tool(name="tavily_search") +def search_tavily(query: str) -> str: + """Search the web using Tavily API for real-time travel information.""" + try: + api_key = os.getenv("TAVILY_API_KEY") + if not api_key: + return "❌ Tavily API key required" + + response = requests.post( + "https://api.tavily.com/search", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + }, + json={ + "query": query, + "search_depth": "advanced", + "max_results": 3, + "include_answer": True + }, + timeout=15 + ) + + if response.status_code == 200: + data = response.json() + result = f"🌐 **Search:** {query}\n\n" + + if "answer" in data and data["answer"]: + result += f"**Summary:** {data['answer']}\n\n" + + if "results" in data: + for i, res in enumerate(data["results"], 1): + title = res.get("title", "")[:50] + "..." + content = res.get("content", "")[:150] + "..." + url = res.get("url", "") + result += f"{i}. **{title}**\n {content}\n πŸ”— {url}\n\n" + + log_action("tavily_search", "search_completed", {"query": query}) + return result + else: + return f"❌ Search failed: {response.status_code}" + + except Exception as e: + return f"Search error: {str(e)}" + +@tool(name="weather_api") +def get_weather(city: str) -> str: + """Get current weather data.""" + try: + api_key = os.getenv("OPENWEATHER_API_KEY") + if not api_key: + return f"🌀️ Weather info for {city}: Generally pleasant in April, 20-25Β°C (simulated data - no API key)" + + response = requests.get( + "https://api.openweathermap.org/data/2.5/weather", + params={"q": city, "appid": api_key, "units": "metric"}, + timeout=10 + ) + + if response.status_code == 200: + data = response.json() + weather = f"🌀️ **Current Weather in {city}:**\n" + weather += f"Temp: {data['main']['temp']}Β°C, {data['weather'][0]['description']}" + log_action("weather_api", "weather_fetched", {"city": city, "temp": data['main']['temp']}) + return weather + else: + return f"🌀️ Weather for {city}: April typically 20-25Β°C, pleasant conditions (API unavailable)" + + except Exception as e: + return f"🌀️ Weather for {city}: April typically 20-25Β°C (error: {str(e)[:50]})" + +@tool(name="save_file") +def save_travel_plan(filename: str, content: str) -> str: + """Save travel plan to a file.""" + try: + path = Path(filename) + path.write_text(content, encoding='utf-8') + log_action("file_save", "plan_saved", {"filename": filename}) + return f"βœ… Saved to {filename}" + except Exception as e: + return f"❌ Save error: {str(e)}" + +# OpenAI tool definitions for function calling +TOOL_DEFINITIONS = [ + { + "type": "function", + "function": { + "name": "search_tavily", + "description": "Search the web for real-time travel information", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query for travel information" + } + }, + "required": ["query"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get current weather data for a city", + "parameters": { + "type": "object", + "properties": { + "city": { + "type": "string", + "description": "City name to get weather for" + } + }, + "required": ["city"] + } + } + }, + { + "type": "function", + "function": { + "name": "save_travel_plan", + "description": "Save travel plan to a file", + "parameters": { + "type": "object", + "properties": { + "filename": { + "type": "string", + "description": "Filename to save the plan" + }, + "content": { + "type": "string", + "description": "Content to save" + } + }, + "required": ["filename", "content"] + } + } + } +] + +# Function registry for tool calling +FUNCTION_REGISTRY = { + "search_tavily": search_tavily, + "get_weather": get_weather, + "save_travel_plan": save_travel_plan +} + +def execute_function_call(function_call) -> str: + """Execute OpenAI function calls.""" + function_name = function_call.function.name + arguments = json.loads(function_call.function.arguments) + + if function_name in FUNCTION_REGISTRY: + return FUNCTION_REGISTRY[function_name](**arguments) + else: + return f"❌ Unknown function: {function_name}" + +@agent(name="travel_agent_with_tools") +def travel_planning_agent(user_request: str) -> str: + """AI travel agent with access to real tools via OpenAI function calling.""" + log_action("travel_agent", "started", {"request": user_request}) + + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + return "❌ OpenAI API key required" + + client = openai.OpenAI(api_key=api_key) + + system_prompt = """You are an expert travel planning agent with access to real-time tools: + + AVAILABLE TOOLS: + - search_tavily: Search the web for current travel information + - get_weather: Get real weather data for any city + - save_travel_plan: Save complete travel plans to files + + PROCESS: + 1. Use search_tavily to research destinations, attractions, and travel info + 2. Use get_weather to get current conditions + 3. Create comprehensive travel plans + 4. Save the final plan using save_travel_plan + + Be thorough, use multiple searches, and provide detailed, actionable travel advice.""" + + try: + print(f"πŸ”§ **Initializing OpenAI with {len(TOOL_DEFINITIONS)} tool definitions**") + + # Initial conversation with tools + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_request} + ], + tools=TOOL_DEFINITIONS, + tool_choice="auto", + max_tokens=1500 + ) + + conversation = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_request} + ] + + # Process tool calls + while response.choices[0].message.tool_calls: + message = response.choices[0].message + conversation.append({ + "role": "assistant", + "content": message.content, + "tool_calls": [ + { + "id": tc.id, + "type": tc.type, + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments + } + } for tc in message.tool_calls + ] + }) + + # Execute each tool call + for tool_call in message.tool_calls: + print(f"πŸ”§ Calling tool: {tool_call.function.name}") + tool_result = execute_function_call(tool_call) + print(f"πŸ“Š Result: {tool_result[:100]}...\n") + + conversation.append({ + "role": "tool", + "tool_call_id": tool_call.id, + "content": tool_result + }) + + # Continue conversation with tool results + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=conversation, + tools=TOOL_DEFINITIONS, + tool_choice="auto", + max_tokens=1500 + ) + + # Final response + final_response = response.choices[0].message.content + log_action("travel_agent", "completed", {"tools_used": len([msg for msg in conversation if msg.get("role") == "tool"])}) + + return final_response + + except Exception as e: + return f"❌ Agent error: {str(e)}" + +@workflow(name="travel_planning_workflow") +def plan_travel(user_request: str) -> str: + """Main workflow using OpenAI tool calling.""" + print("πŸš€ Starting Travel Planning with OpenAI Tool Calling\n") + + # Check API status + openai_key = bool(os.getenv("OPENAI_API_KEY")) + tavily_key = bool(os.getenv("TAVILY_API_KEY")) + weather_key = bool(os.getenv("OPENWEATHER_API_KEY")) + + print("πŸ”‘ API Status:") + print(f" OpenAI: {'βœ… Ready' if openai_key else '❌ Missing'}") + print(f" Tavily: {'βœ… Ready' if tavily_key else '❌ Missing'}") + print(f" Weather: {'βœ… Ready' if weather_key else '❌ Missing'}") + print() + + if not openai_key: + return "❌ OpenAI API key required for tool calling" + + print(f"πŸ“ **Request:** {user_request}\n") + print("=" * 60) + + # Run travel agent with OpenAI tool calling + result = travel_planning_agent(user_request) + + print("\n" + "=" * 60) + print(f"πŸ“Š **Stats:** {len(agent_memory['conversation_log'])} actions logged") + + return result + +if __name__ == "__main__": + travel_request = "Plan a 4-day trip to Tokyo in April 2025. I'm interested in technology, food, and traditional culture. Find flights from San Francisco and recommend hotels." + + final_result = plan_travel(travel_request) + + print("\nπŸŽ‰ **Final Result:**") + print(final_result) \ No newline at end of file