diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/LICENSE b/instrumentation-genai/opentelemetry-instrumentation-cohere/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/README.rst b/instrumentation-genai/opentelemetry-instrumentation-cohere/README.rst new file mode 100644 index 0000000000..2ae12a795d --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/README.rst @@ -0,0 +1,41 @@ +OpenTelemetry Cohere Instrumentation +===================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-cohere.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-cohere/ + +This library allows tracing applications that use the `Cohere Python SDK `_. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-cohere + +Usage +----- + +.. code-block:: python + + from cohere import ClientV2 + from opentelemetry.instrumentation.cohere import CohereInstrumentor + + CohereInstrumentor().instrument() + + client = ClientV2() + response = client.chat( + model="command-r-plus", + messages=[ + {"role": "user", "content": "Hello, how are you?"}, + ], + ) + +References +---------- + +* `OpenTelemetry Cohere Instrumentation `_ +* `OpenTelemetry Project `_ +* `OpenTelemetry Python Examples `_ diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-cohere/pyproject.toml new file mode 100644 index 0000000000..09aa4ed3dd --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/pyproject.toml @@ -0,0 +1,58 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-instrumentation-cohere" +dynamic = ["version"] +description = "OpenTelemetry Cohere instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] +dependencies = [ + "opentelemetry-api ~= 1.39", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-semantic-conventions ~= 0.60b0", + "opentelemetry-util-genai", +] + +[project.optional-dependencies] +instruments = ["cohere >= 5.0.0"] +test = [ + "opentelemetry-instrumentation-cohere[instruments]", + "opentelemetry-sdk", + "opentelemetry-test-utils", + "pytest", +] + +[project.entry-points.opentelemetry_instrumentor] +cohere = "opentelemetry.instrumentation.cohere:CohereInstrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-cohere" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/cohere/version.py" + +[tool.hatch.build.targets.sdist] +include = ["/src", "/tests"] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/__init__.py new file mode 100644 index 0000000000..a816f86550 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/__init__.py @@ -0,0 +1,123 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cohere client instrumentation supporting `cohere`, it can be enabled by +using ``CohereInstrumentor``. + +.. _cohere: https://pypi.org/project/cohere/ + +Usage +----- + +.. code:: python + + from cohere import ClientV2 + from opentelemetry.instrumentation.cohere import CohereInstrumentor + + CohereInstrumentor().instrument() + + client = ClientV2() + response = client.chat( + model="command-r-plus", + messages=[ + {"role": "user", "content": "Write a short poem on open telemetry."}, + ], + ) + +API +--- +""" + +from typing import Collection + +from wrapt import wrap_function_wrapper + +from opentelemetry.instrumentation.cohere.package import _instruments +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ContentCapturingMode +from opentelemetry.util.genai.utils import ( + get_content_capturing_mode, + is_experimental_mode, +) + +from .patch import ( + async_chat_create, + async_chat_stream_create, + chat_create, + chat_stream_create, +) + + +class CohereInstrumentor(BaseInstrumentor): + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + """Enable Cohere instrumentation.""" + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + latest_experimental_enabled = is_experimental_mode() + content_mode = ( + get_content_capturing_mode() + if latest_experimental_enabled + else ContentCapturingMode.NO_CONTENT + ) + + handler = TelemetryHandler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) + + # Instrument sync V2Client.chat + wrap_function_wrapper( + module="cohere.v2.client", + name="V2Client.chat", + wrapper=chat_create(handler, content_mode), + ) + + # Instrument sync V2Client.chat_stream + wrap_function_wrapper( + module="cohere.v2.client", + name="V2Client.chat_stream", + wrapper=chat_stream_create(handler, content_mode), + ) + + # Instrument async AsyncV2Client.chat + wrap_function_wrapper( + module="cohere.v2.client", + name="AsyncV2Client.chat", + wrapper=async_chat_create(handler, content_mode), + ) + + # Instrument async AsyncV2Client.chat_stream + wrap_function_wrapper( + module="cohere.v2.client", + name="AsyncV2Client.chat_stream", + wrapper=async_chat_stream_create(handler, content_mode), + ) + + + def _uninstrument(self, **kwargs): + import cohere.v2.client # pylint: disable=import-outside-toplevel + + unwrap(cohere.v2.client.V2Client, "chat") + unwrap(cohere.v2.client.V2Client, "chat_stream") + unwrap(cohere.v2.client.AsyncV2Client, "chat") + unwrap(cohere.v2.client.AsyncV2Client, "chat_stream") diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/package.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/package.py new file mode 100644 index 0000000000..60ee8acda8 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/package.py @@ -0,0 +1,18 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_instruments = ("cohere >= 5.0.0",) + +_supports_metrics = True diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/patch.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/patch.py new file mode 100644 index 0000000000..c1b6d548d3 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/patch.py @@ -0,0 +1,349 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + ContentCapturingMode, + Error, + LLMInvocation, + OutputMessage, + Text, +) + +from .utils import ( + COHERE_PROVIDER_NAME, + create_chat_invocation, + map_finish_reason, + set_response_attributes, +) + + +def chat_create( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + """Wrap ``V2Client.chat`` to emit GenAI telemetry.""" + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + def traced_method(wrapped, instance, args, kwargs): + invocation = handler.start_llm( + create_chat_invocation(kwargs, instance, capture_content=capture_content) + ) + try: + result = wrapped(*args, **kwargs) + set_response_attributes(invocation, result, capture_content) + handler.stop_llm(invocation) + return result + except Exception as error: + handler.fail_llm( + invocation, Error(type=type(error), message=str(error)) + ) + raise + + return traced_method + + +def async_chat_create( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + """Wrap ``AsyncV2Client.chat`` to emit GenAI telemetry.""" + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + async def traced_method(wrapped, instance, args, kwargs): + invocation = handler.start_llm( + create_chat_invocation(kwargs, instance, capture_content=capture_content) + ) + try: + result = await wrapped(*args, **kwargs) + set_response_attributes(invocation, result, capture_content) + handler.stop_llm(invocation) + return result + except Exception as error: + handler.fail_llm( + invocation, Error(type=type(error), message=str(error)) + ) + raise + + return traced_method + + +def chat_stream_create( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + """Wrap ``V2Client.chat_stream`` to emit GenAI telemetry.""" + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + def traced_method(wrapped, instance, args, kwargs): + invocation = handler.start_llm( + create_chat_invocation(kwargs, instance, capture_content=capture_content) + ) + try: + result = wrapped(*args, **kwargs) + return CohereStreamWrapper(result, handler, invocation, capture_content) + except Exception as error: + handler.fail_llm( + invocation, Error(type=type(error), message=str(error)) + ) + raise + + return traced_method + + +def async_chat_stream_create( + handler: TelemetryHandler, + content_capturing_mode: ContentCapturingMode, +): + """Wrap ``AsyncV2Client.chat_stream`` to emit GenAI telemetry.""" + capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + + async def traced_method(wrapped, instance, args, kwargs): + invocation = handler.start_llm( + create_chat_invocation(kwargs, instance, capture_content=capture_content) + ) + try: + result = wrapped(*args, **kwargs) + return AsyncCohereStreamWrapper(result, handler, invocation, capture_content) + except Exception as error: + handler.fail_llm( + invocation, Error(type=type(error), message=str(error)) + ) + raise + + return traced_method + + +class CohereStreamWrapper: + """Wraps a synchronous Cohere chat_stream iterator to capture telemetry.""" + + def __init__( + self, + stream, + handler: TelemetryHandler, + invocation: LLMInvocation, + capture_content: bool, + ): + self._stream = stream + self._handler = handler + self._invocation = invocation + self._capture_content = capture_content + self._content_parts: list[str] = [] + self._finish_reason = None + self._response_id = None + + def __iter__(self): + return self + + def __next__(self): + try: + event = next(self._stream) + self._process_event(event) + return event + except StopIteration: + self._finalize() + raise + except Exception as error: + self._handler.fail_llm( + self._invocation, + Error(type=type(error), message=str(error)), + ) + raise + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self._handler.fail_llm( + self._invocation, + Error(type=exc_type, message=str(exc_val)), + ) + return False + + def _process_event(self, event): + event_type = getattr(event, "type", None) + + if event_type == "message-start": + delta = getattr(event, "delta", None) + if delta: + msg = getattr(delta, "message", None) + if msg: + role = getattr(msg, "role", None) + if role: + self._invocation.attributes["_cohere_role"] = role + event_id = getattr(event, "id", None) + if event_id: + self._response_id = event_id + + elif event_type == "content-delta": + delta = getattr(event, "delta", None) + if delta: + msg = getattr(delta, "message", None) + if msg: + content = getattr(msg, "content", None) + if content: + text = getattr(content, "text", None) + if text: + self._content_parts.append(text) + + elif event_type == "message-end": + delta = getattr(event, "delta", None) + if delta: + self._finish_reason = getattr(delta, "finish_reason", None) + usage = getattr(delta, "usage", None) + if usage: + from .utils import _set_usage + + _set_usage(self._invocation, usage) + event_id = getattr(event, "id", None) + if event_id: + self._response_id = event_id + + def _finalize(self): + if self._response_id: + self._invocation.response_id = self._response_id + + if self._finish_reason is not None: + self._invocation.finish_reasons = [ + map_finish_reason(self._finish_reason) + ] + + if self._capture_content and self._content_parts: + role = self._invocation.attributes.pop("_cohere_role", "assistant") + full_text = "".join(self._content_parts) + self._invocation.output_messages = [ + OutputMessage( + role=role, + parts=[Text(content=full_text)], + finish_reason=map_finish_reason(self._finish_reason), + ) + ] + else: + self._invocation.attributes.pop("_cohere_role", None) + + self._handler.stop_llm(self._invocation) + + +class AsyncCohereStreamWrapper: + """Wraps an async Cohere chat_stream iterator to capture telemetry.""" + + def __init__( + self, + stream, + handler: TelemetryHandler, + invocation: LLMInvocation, + capture_content: bool, + ): + self._stream = stream + self._handler = handler + self._invocation = invocation + self._capture_content = capture_content + self._content_parts: list[str] = [] + self._finish_reason = None + self._response_id = None + + def __aiter__(self): + return self + + async def __anext__(self): + try: + event = await self._stream.__anext__() + self._process_event(event) + return event + except StopAsyncIteration: + self._finalize() + raise + except Exception as error: + self._handler.fail_llm( + self._invocation, + Error(type=type(error), message=str(error)), + ) + raise + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self._handler.fail_llm( + self._invocation, + Error(type=exc_type, message=str(exc_val)), + ) + return False + + def _process_event(self, event): + event_type = getattr(event, "type", None) + + if event_type == "message-start": + delta = getattr(event, "delta", None) + if delta: + msg = getattr(delta, "message", None) + if msg: + role = getattr(msg, "role", None) + if role: + self._invocation.attributes["_cohere_role"] = role + event_id = getattr(event, "id", None) + if event_id: + self._response_id = event_id + + elif event_type == "content-delta": + delta = getattr(event, "delta", None) + if delta: + msg = getattr(delta, "message", None) + if msg: + content = getattr(msg, "content", None) + if content: + text = getattr(content, "text", None) + if text: + self._content_parts.append(text) + + elif event_type == "message-end": + delta = getattr(event, "delta", None) + if delta: + self._finish_reason = getattr(delta, "finish_reason", None) + usage = getattr(delta, "usage", None) + if usage: + from .utils import _set_usage + + _set_usage(self._invocation, usage) + event_id = getattr(event, "id", None) + if event_id: + self._response_id = event_id + + def _finalize(self): + if self._response_id: + self._invocation.response_id = self._response_id + + if self._finish_reason is not None: + self._invocation.finish_reasons = [ + map_finish_reason(self._finish_reason) + ] + + if self._capture_content and self._content_parts: + role = self._invocation.attributes.pop("_cohere_role", "assistant") + full_text = "".join(self._content_parts) + self._invocation.output_messages = [ + OutputMessage( + role=role, + parts=[Text(content=full_text)], + finish_reason=map_finish_reason(self._finish_reason), + ) + ] + else: + self._invocation.attributes.pop("_cohere_role", None) + + self._handler.stop_llm(self._invocation) diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/utils.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/utils.py new file mode 100644 index 0000000000..0655bbc37a --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/utils.py @@ -0,0 +1,229 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any, List, Optional + +from opentelemetry.util.genai.types import ( + InputMessage, + LLMInvocation, + OutputMessage, + Text, + ToolCallRequest, +) + +COHERE_PROVIDER_NAME = "cohere" + +# Mapping from Cohere finish reasons to GenAI semantic convention finish reasons +_FINISH_REASON_MAP = { + "COMPLETE": "stop", + "STOP_SEQUENCE": "stop", + "MAX_TOKENS": "length", + "TOOL_CALL": "tool_calls", + "ERROR": "error", + "TIMEOUT": "error", +} + + +def map_finish_reason(cohere_reason: Optional[str]) -> str: + if cohere_reason is None: + return "error" + return _FINISH_REASON_MAP.get(str(cohere_reason), str(cohere_reason).lower()) + + +def get_server_address_and_port( + client_instance: Any, +) -> tuple[Optional[str], Optional[int]]: + """Extract server address and port from the Cohere client instance.""" + base_url = getattr(client_instance, "base_url", None) + if base_url is None: + # Check nested _client pattern + inner = getattr(client_instance, "_client", None) + if inner is not None: + base_url = getattr(inner, "base_url", None) + + if base_url is None: + return "api.cohere.com", None + + if isinstance(base_url, str): + from urllib.parse import urlparse + + parsed = urlparse(base_url) + address = parsed.hostname or "api.cohere.com" + port = parsed.port + if port == 443: + port = None + return address, port + + return "api.cohere.com", None + + +def create_chat_invocation( + kwargs: dict[str, Any], + client_instance: Any, + capture_content: bool, +) -> LLMInvocation: + """Create an LLMInvocation from Cohere chat kwargs.""" + invocation = LLMInvocation(request_model=kwargs.get("model", "")) + invocation.provider = COHERE_PROVIDER_NAME + invocation.temperature = kwargs.get("temperature") + invocation.top_p = kwargs.get("p") + invocation.max_tokens = kwargs.get("max_tokens") + invocation.frequency_penalty = kwargs.get("frequency_penalty") + invocation.presence_penalty = kwargs.get("presence_penalty") + invocation.seed = kwargs.get("seed") + + stop_sequences = kwargs.get("stop_sequences") + if stop_sequences is not None: + if isinstance(stop_sequences, str): + stop_sequences = [stop_sequences] + invocation.stop_sequences = list(stop_sequences) + + address, port = get_server_address_and_port(client_instance) + if address: + invocation.server_address = address + if port: + invocation.server_port = port + + if capture_content: + invocation.input_messages = _extract_input_messages( + kwargs.get("messages", []) + ) + + return invocation + + +def set_response_attributes( + invocation: LLMInvocation, + response: Any, + capture_content: bool, +) -> None: + """Populate invocation attributes from a Cohere V2ChatResponse.""" + if response is None: + return + + response_id = getattr(response, "id", None) + if response_id: + invocation.response_id = response_id + + finish_reason = getattr(response, "finish_reason", None) + if finish_reason is not None: + invocation.finish_reasons = [map_finish_reason(finish_reason)] + + usage = getattr(response, "usage", None) + if usage is not None: + _set_usage(invocation, usage) + + if capture_content: + message = getattr(response, "message", None) + if message is not None: + invocation.output_messages = _extract_output_messages( + message, finish_reason + ) + + +def _set_usage(invocation: LLMInvocation, usage: Any) -> None: + """Extract token usage from Cohere Usage object.""" + tokens = getattr(usage, "tokens", None) + if tokens is not None: + input_tokens = getattr(tokens, "input_tokens", None) + if input_tokens is not None: + invocation.input_tokens = int(input_tokens) + output_tokens = getattr(tokens, "output_tokens", None) + if output_tokens is not None: + invocation.output_tokens = int(output_tokens) + return + + # Fallback to billed_units + billed = getattr(usage, "billed_units", None) + if billed is not None: + input_tokens = getattr(billed, "input_tokens", None) + if input_tokens is not None: + invocation.input_tokens = int(input_tokens) + output_tokens = getattr(billed, "output_tokens", None) + if output_tokens is not None: + invocation.output_tokens = int(output_tokens) + + +def _extract_input_messages( + messages: List[Any], +) -> List[InputMessage]: + """Convert Cohere chat messages to InputMessage list.""" + result = [] + for msg in messages: + if isinstance(msg, dict): + role = msg.get("role", "user") + content = msg.get("content", "") + else: + role = getattr(msg, "role", "user") + content = getattr(msg, "content", "") + + parts: list = [] + if isinstance(content, str): + parts.append(Text(content=content)) + elif isinstance(content, list): + # Handle structured content items + for item in content: + if isinstance(item, dict): + text = item.get("text", "") + if text: + parts.append(Text(content=text)) + elif isinstance(item, str): + parts.append(Text(content=item)) + else: + text = getattr(item, "text", None) + if text: + parts.append(Text(content=str(text))) + + result.append(InputMessage(role=str(role), parts=parts)) + return result + + +def _extract_output_messages( + message: Any, + finish_reason: Any, +) -> List[OutputMessage]: + """Convert a Cohere AssistantMessageResponse to OutputMessage list.""" + parts: list = [] + + content_items = getattr(message, "content", None) + if content_items: + for item in content_items: + item_type = getattr(item, "type", None) + if item_type == "text": + text = getattr(item, "text", "") + parts.append(Text(content=str(text))) + + tool_calls = getattr(message, "tool_calls", None) + if tool_calls: + for tc in tool_calls: + tc_id = getattr(tc, "id", None) + tc_name = "" + tc_args = None + func = getattr(tc, "function", None) + if func: + tc_name = getattr(func, "name", "") or "" + tc_args = getattr(func, "arguments", None) + else: + tc_name = getattr(tc, "name", "") or "" + tc_args = getattr(tc, "parameters", None) + parts.append( + ToolCallRequest(id=tc_id, name=tc_name, arguments=tc_args) + ) + + role = getattr(message, "role", "assistant") or "assistant" + mapped_reason = map_finish_reason(finish_reason) + + return [OutputMessage(role=str(role), parts=parts, finish_reason=mapped_reason)] diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/version.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/version.py new file mode 100644 index 0000000000..e7bf4a48eb --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/src/opentelemetry/instrumentation/cohere/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.1b0.dev" diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/__init__.py new file mode 100644 index 0000000000..b0a6f42841 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/conftest.py new file mode 100644 index 0000000000..2d9c084eaf --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/conftest.py @@ -0,0 +1,133 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test configuration for Cohere instrumentation tests.""" + +import os + +import pytest + +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) +from opentelemetry.instrumentation.cohere import CohereInstrumentor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, +) + +try: + from opentelemetry.sdk._logs.export import ( + InMemoryLogRecordExporter, + SimpleLogRecordProcessor, + ) +except ImportError: + from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter as InMemoryLogRecordExporter, + ) + from opentelemetry.sdk._logs.export import ( + SimpleLogRecordProcessor, + ) + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + + +@pytest.fixture(scope="function", name="span_exporter") +def fixture_span_exporter(): + exporter = InMemorySpanExporter() + yield exporter + + +@pytest.fixture(scope="function", name="log_exporter") +def fixture_log_exporter(): + exporter = InMemoryLogRecordExporter() + yield exporter + + +@pytest.fixture(scope="function", name="metric_reader") +def fixture_metric_reader(): + reader = InMemoryMetricReader() + yield reader + + +@pytest.fixture(scope="function", name="tracer_provider") +def fixture_tracer_provider(span_exporter): + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + return provider + + +@pytest.fixture(scope="function", name="logger_provider") +def fixture_logger_provider(log_exporter): + provider = LoggerProvider() + provider.add_log_record_processor(SimpleLogRecordProcessor(log_exporter)) + return provider + + +@pytest.fixture(scope="function", name="meter_provider") +def fixture_meter_provider(metric_reader): + meter_provider = MeterProvider( + metric_readers=[metric_reader], + ) + return meter_provider + + +@pytest.fixture(autouse=True) +def environment(): + if not os.getenv("CO_API_KEY"): + os.environ["CO_API_KEY"] = "test_cohere_api_key" + + +@pytest.fixture(scope="function") +def instrument_no_content(tracer_provider, logger_provider, meter_provider): + _OpenTelemetrySemanticConventionStability._initialized = False + os.environ[OTEL_SEMCONV_STABILITY_OPT_IN] = "gen_ai_latest_experimental" + + instrumentor = CohereInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + ) + + yield instrumentor + os.environ.pop(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, None) + os.environ.pop(OTEL_SEMCONV_STABILITY_OPT_IN, None) + instrumentor.uninstrument() + + +@pytest.fixture(scope="function") +def instrument_with_content(tracer_provider, logger_provider, meter_provider): + _OpenTelemetrySemanticConventionStability._initialized = False + os.environ[OTEL_SEMCONV_STABILITY_OPT_IN] = "gen_ai_latest_experimental" + os.environ[OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT] = "span_only" + + instrumentor = CohereInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + ) + + yield instrumentor + os.environ.pop(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, None) + os.environ.pop(OTEL_SEMCONV_STABILITY_OPT_IN, None) + instrumentor.uninstrument() diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_async_chat_completions.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_async_chat_completions.py new file mode 100644 index 0000000000..cbf18c450c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_async_chat_completions.py @@ -0,0 +1,137 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for async Cohere chat completions instrumentation.""" + +import asyncio + +import httpx +import pytest + +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) + + +def _chat_response_json( + response_id="async-response-id", + finish_reason="COMPLETE", + content_text="Hello from async!", + input_tokens=10, + output_tokens=20, +): + return { + "id": response_id, + "finish_reason": finish_reason, + "message": { + "role": "assistant", + "content": [{"type": "text", "text": content_text}], + }, + "usage": { + "tokens": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + }, + }, + } + + +def _make_async_client(response_json=None, handler=None): + """Create a Cohere AsyncClientV2 with a mock HTTP transport.""" + if handler is None: + body = response_json or _chat_response_json() + + async def default_handler(request): + return httpx.Response(200, json=body) + + handler = default_handler + + transport = httpx.MockTransport(handler) + httpx_client = httpx.AsyncClient(transport=transport) + + from cohere import AsyncClientV2 + + return AsyncClientV2(api_key="test-key", httpx_client=httpx_client) + + +class TestAsyncChatCompletions: + """Test async chat completions.""" + + @pytest.mark.usefixtures("instrument_no_content") + def test_async_chat_basic(self, span_exporter): + async def run(): + client = _make_async_client() + response = await client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello async"}], + ) + return response + + response = asyncio.run(run()) + assert response.id == "async-response-id" + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chat command-r-plus" + attrs = dict(span.attributes) + assert attrs[GenAIAttributes.GEN_AI_OPERATION_NAME] == "chat" + assert attrs[GenAIAttributes.GEN_AI_REQUEST_MODEL] == "command-r-plus" + assert attrs[GenAIAttributes.GEN_AI_PROVIDER_NAME] == "cohere" + assert attrs[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert attrs[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert attrs[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ("stop",) + + @pytest.mark.usefixtures("instrument_no_content") + def test_async_chat_error(self, span_exporter): + async def error_handler(request): + raise httpx.ConnectError("Async connection refused") + + async def run(): + client = _make_async_client(handler=error_handler) + await client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + ) + + with pytest.raises(Exception): + asyncio.run(run()) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code.name == "ERROR" + + @pytest.mark.usefixtures("instrument_with_content") + def test_async_chat_with_content(self, span_exporter): + async def run(): + client = _make_async_client( + response_json=_chat_response_json( + content_text="Async content capture test" + ) + ) + await client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Test content"}], + ) + + asyncio.run(run()) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert GenAIAttributes.GEN_AI_INPUT_MESSAGES in attrs + assert GenAIAttributes.GEN_AI_OUTPUT_MESSAGES in attrs + assert "Test content" in attrs[GenAIAttributes.GEN_AI_INPUT_MESSAGES] + assert "Async content capture test" in attrs[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES] diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_chat_completions.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_chat_completions.py new file mode 100644 index 0000000000..09e4ebaadc --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_chat_completions.py @@ -0,0 +1,381 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Cohere chat completions instrumentation.""" + +import json + +import httpx +import pytest + +from opentelemetry.instrumentation.cohere import CohereInstrumentor +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv.attributes import ( + server_attributes as ServerAttributes, +) + + +def _chat_response_json( + response_id="test-response-id", + finish_reason="COMPLETE", + content_text="Hello! How can I help you?", + input_tokens=10, + output_tokens=20, +): + return { + "id": response_id, + "finish_reason": finish_reason, + "message": { + "role": "assistant", + "content": [{"type": "text", "text": content_text}], + }, + "usage": { + "tokens": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + }, + }, + } + + +def _make_client(response_json=None, handler=None): + """Create a Cohere ClientV2 with a mock HTTP transport.""" + if handler is None: + body = response_json or _chat_response_json() + + def handler(request): + return httpx.Response(200, json=body) + + transport = httpx.MockTransport(handler) + httpx_client = httpx.Client(transport=transport) + + from cohere import ClientV2 + + return ClientV2(api_key="test-key", httpx_client=httpx_client) + + + + +def _make_stream_handler(events): + """Create an httpx handler that returns SSE-formatted stream events.""" + + def handler(request): + lines = [] + for event in events: + lines.append(f"data: {json.dumps(event)}") + lines.append("") # blank line = SSE event separator + body = "\n".join(lines) + "\n" + return httpx.Response( + 200, + content=body.encode(), + headers={"content-type": "text/event-stream"}, + ) + + return handler + + +def _stream_events( + content_parts=None, + finish_reason="COMPLETE", + input_tokens=5, + output_tokens=15, + stream_id="stream-id-123", +): + """Generate a list of SSE events for a streaming chat response.""" + if content_parts is None: + content_parts = ["Hello ", "world!"] + + events = [ + { + "type": "message-start", + "id": stream_id, + "delta": {"message": {"role": "assistant"}}, + }, + ] + for text in content_parts: + events.append( + { + "type": "content-delta", + "index": 0, + "delta": {"message": {"content": {"text": text}}}, + } + ) + events.append( + { + "type": "message-end", + "id": stream_id, + "delta": { + "finish_reason": finish_reason, + "usage": { + "tokens": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + }, + }, + }, + } + ) + return events + +class TestChatCompletionsNoContent: + """Test sync chat completions without content capture.""" + + @pytest.mark.usefixtures("instrument_no_content") + def test_chat_basic(self, span_exporter): + client = _make_client() + response = client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + ) + + assert response.id == "test-response-id" + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chat command-r-plus" + attrs = dict(span.attributes) + assert attrs[GenAIAttributes.GEN_AI_OPERATION_NAME] == "chat" + assert attrs[GenAIAttributes.GEN_AI_REQUEST_MODEL] == "command-r-plus" + assert attrs[GenAIAttributes.GEN_AI_PROVIDER_NAME] == "cohere" + assert attrs[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert attrs[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert attrs[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ("stop",) + assert attrs[GenAIAttributes.GEN_AI_RESPONSE_ID] == "test-response-id" + assert attrs[ServerAttributes.SERVER_ADDRESS] == "api.cohere.com" + + # No content should be captured + assert GenAIAttributes.GEN_AI_INPUT_MESSAGES not in attrs + assert GenAIAttributes.GEN_AI_OUTPUT_MESSAGES not in attrs + + @pytest.mark.usefixtures("instrument_no_content") + def test_chat_with_optional_params(self, span_exporter): + client = _make_client() + client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + temperature=0.7, + max_tokens=100, + p=0.9, + frequency_penalty=0.5, + presence_penalty=0.3, + seed=42, + stop_sequences=["END"], + ) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE] == 0.7 + assert attrs[GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS] == 100 + assert attrs[GenAIAttributes.GEN_AI_REQUEST_TOP_P] == 0.9 + assert attrs[GenAIAttributes.GEN_AI_REQUEST_FREQUENCY_PENALTY] == 0.5 + assert attrs[GenAIAttributes.GEN_AI_REQUEST_PRESENCE_PENALTY] == 0.3 + assert attrs[GenAIAttributes.GEN_AI_REQUEST_SEED] == 42 + assert attrs[GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES] == ("END",) + + @pytest.mark.usefixtures("instrument_no_content") + def test_chat_max_tokens_finish(self, span_exporter): + client = _make_client( + response_json=_chat_response_json(finish_reason="MAX_TOKENS") + ) + client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + ) + + spans = span_exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ("length",) + + @pytest.mark.usefixtures("instrument_no_content") + def test_chat_error(self, span_exporter): + def error_handler(request): + raise httpx.ConnectError("Connection refused") + + client = _make_client(handler=error_handler) + with pytest.raises(Exception): + client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + ) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.status.status_code.name == "ERROR" + + +class TestChatCompletionsWithContent: + """Test sync chat completions with content capture enabled.""" + + @pytest.mark.usefixtures("instrument_with_content") + def test_chat_captures_content(self, span_exporter): + client = _make_client( + response_json=_chat_response_json( + content_text="I'm doing great, thanks!" + ) + ) + client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "How are you?"}], + ) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + + assert GenAIAttributes.GEN_AI_INPUT_MESSAGES in attrs + assert GenAIAttributes.GEN_AI_OUTPUT_MESSAGES in attrs + + input_msgs = attrs[GenAIAttributes.GEN_AI_INPUT_MESSAGES] + assert "How are you?" in input_msgs + + output_msgs = attrs[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES] + assert "I'm doing great, thanks!" in output_msgs + + @pytest.mark.usefixtures("instrument_with_content") + def test_chat_multi_message(self, span_exporter): + client = _make_client( + response_json=_chat_response_json( + content_text="The capital of France is Paris." + ) + ) + client.chat( + model="command-r-plus", + messages=[ + {"role": "system", "content": "You are a geography expert."}, + {"role": "user", "content": "What is the capital of France?"}, + ], + ) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + input_msgs = attrs[GenAIAttributes.GEN_AI_INPUT_MESSAGES] + assert "geography expert" in input_msgs + assert "capital of France" in input_msgs + + + + +class TestChatStreamNoContent: + """Test streaming chat completions without content capture.""" + + @pytest.mark.usefixtures("instrument_no_content") + def test_chat_stream_basic(self, span_exporter): + events = _stream_events() + client = _make_client(handler=_make_stream_handler(events)) + + chunks = list( + client.chat_stream( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + ) + ) + + # message-start + 2 content-delta + message-end + assert len(chunks) >= 3 + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chat command-r-plus" + attrs = dict(span.attributes) + assert attrs[GenAIAttributes.GEN_AI_OPERATION_NAME] == "chat" + assert attrs[GenAIAttributes.GEN_AI_REQUEST_MODEL] == "command-r-plus" + assert attrs[GenAIAttributes.GEN_AI_PROVIDER_NAME] == "cohere" + assert attrs[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] == 5 + assert attrs[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] == 15 + assert attrs[GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS] == ("stop",) + assert attrs[GenAIAttributes.GEN_AI_RESPONSE_ID] == "stream-id-123" + + # No content captured + assert GenAIAttributes.GEN_AI_OUTPUT_MESSAGES not in attrs + + +class TestChatStreamWithContent: + """Test streaming chat completions with content capture enabled.""" + + @pytest.mark.usefixtures("instrument_with_content") + def test_chat_stream_captures_content(self, span_exporter): + events = _stream_events( + content_parts=["Streamed ", "response"], + stream_id="stream-id-456", + input_tokens=8, + output_tokens=12, + ) + client = _make_client(handler=_make_stream_handler(events)) + + list( + client.chat_stream( + model="command-r-plus", + messages=[{"role": "user", "content": "Stream test"}], + ) + ) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + + assert GenAIAttributes.GEN_AI_OUTPUT_MESSAGES in attrs + output_msgs = attrs[GenAIAttributes.GEN_AI_OUTPUT_MESSAGES] + assert "Streamed response" in output_msgs + +class TestUninstrument: + """Test that uninstrumenting properly restores original methods.""" + + def test_uninstrument( + self, tracer_provider, logger_provider, meter_provider, span_exporter + ): + import os + + from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, + ) + + _OpenTelemetrySemanticConventionStability._initialized = False + os.environ[OTEL_SEMCONV_STABILITY_OPT_IN] = "gen_ai_latest_experimental" + + instrumentor = CohereInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + ) + + client = _make_client() + client.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello"}], + ) + assert len(span_exporter.get_finished_spans()) == 1 + + instrumentor.uninstrument() + span_exporter.clear() + + client2 = _make_client() + client2.chat( + model="command-r-plus", + messages=[{"role": "user", "content": "Hello again"}], + ) + # After uninstrument, no new spans should be created + assert len(span_exporter.get_finished_spans()) == 0 + + os.environ.pop(OTEL_SEMCONV_STABILITY_OPT_IN, None) diff --git a/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_utils.py b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_utils.py new file mode 100644 index 0000000000..baaf7e5119 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-cohere/tests/test_utils.py @@ -0,0 +1,70 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Cohere instrumentation utility functions.""" + +from types import SimpleNamespace + +import pytest + +from opentelemetry.instrumentation.cohere.utils import ( + get_server_address_and_port, + map_finish_reason, +) + + +class TestMapFinishReason: + def test_complete(self): + assert map_finish_reason("COMPLETE") == "stop" + + def test_stop_sequence(self): + assert map_finish_reason("STOP_SEQUENCE") == "stop" + + def test_max_tokens(self): + assert map_finish_reason("MAX_TOKENS") == "length" + + def test_tool_call(self): + assert map_finish_reason("TOOL_CALL") == "tool_calls" + + def test_error(self): + assert map_finish_reason("ERROR") == "error" + + def test_timeout(self): + assert map_finish_reason("TIMEOUT") == "error" + + def test_none(self): + assert map_finish_reason(None) == "error" + + def test_unknown(self): + assert map_finish_reason("UNKNOWN_REASON") == "unknown_reason" + + +class TestGetServerAddressAndPort: + def test_default_address(self): + client = SimpleNamespace() + address, port = get_server_address_and_port(client) + assert address == "api.cohere.com" + assert port is None + + def test_custom_base_url(self): + client = SimpleNamespace(base_url="https://custom.cohere.example.com:8443/v2") + address, port = get_server_address_and_port(client) + assert address == "custom.cohere.example.com" + assert port == 8443 + + def test_standard_https_port_omitted(self): + client = SimpleNamespace(base_url="https://api.cohere.com:443/v2") + address, port = get_server_address_and_port(client) + assert address == "api.cohere.com" + assert port is None