diff --git a/util/opentelemetry-util-genai/tests/test_handler_metrics.py b/util/opentelemetry-util-genai/tests/test_handler_metrics.py index 6db12f0cd2..b45383abfe 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_metrics.py +++ b/util/opentelemetry-util-genai/tests/test_handler_metrics.py @@ -1,38 +1,22 @@ from __future__ import annotations -from typing import Any, Dict, List, Tuple -from unittest import TestCase +from typing import Any, Dict, List from unittest.mock import patch -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, -) from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) from opentelemetry.semconv.schemas import Schemas +from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import Error, LLMInvocation _DEFAULT_SCHEMA_URL = Schemas.V1_37_0.value +SCOPE = "opentelemetry.util.genai.handler" -class TelemetryHandlerMetricsTest(TestCase): - def setUp(self) -> None: - self.metric_reader = InMemoryMetricReader() - self.meter_provider = MeterProvider( - metric_readers=[self.metric_reader] - ) - self.span_exporter = InMemorySpanExporter() - self.tracer_provider = TracerProvider() - self.tracer_provider.add_span_processor( - SimpleSpanProcessor(self.span_exporter) - ) +class TelemetryHandlerMetricsTest(TestBase): def test_stop_llm_records_duration_and_tokens(self) -> None: handler = TelemetryHandler( tracer_provider=self.tracer_provider, @@ -52,10 +36,8 @@ def test_stop_llm_records_duration_and_tokens(self) -> None: ): handler.stop_llm(invocation) - metrics, resource_metrics = self._harvest_metrics() - self._assert_metric_scope_schema_urls( - resource_metrics, _DEFAULT_SCHEMA_URL - ) + self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) + metrics = self._harvest_metrics() self.assertIn("gen_ai.client.operation.duration", metrics) duration_points = metrics["gen_ai.client.operation.duration"] self.assertEqual(len(duration_points), 1) @@ -110,10 +92,8 @@ def test_stop_llm_records_duration_and_tokens_with_additional_attributes( invocation.attributes = {"should not be on metrics": "value"} handler.stop_llm(invocation) - metrics, resource_metrics = self._harvest_metrics() - self._assert_metric_scope_schema_urls( - resource_metrics, _DEFAULT_SCHEMA_URL - ) + self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) + metrics = self._harvest_metrics() self.assertIn("gen_ai.client.operation.duration", metrics) duration_points = metrics["gen_ai.client.operation.duration"] self.assertIn("gen_ai.client.token.usage", metrics) @@ -148,10 +128,8 @@ def test_fail_llm_records_error_and_available_tokens(self) -> None: ): handler.fail_llm(invocation, error) - metrics, resource_metrics = self._harvest_metrics() - self._assert_metric_scope_schema_urls( - resource_metrics, _DEFAULT_SCHEMA_URL - ) + self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) + metrics = self._harvest_metrics() self.assertIn("gen_ai.client.operation.duration", metrics) duration_points = metrics["gen_ai.client.operation.duration"] self.assertEqual(len(duration_points), 1) @@ -177,35 +155,29 @@ def test_fail_llm_records_error_and_available_tokens(self) -> None: def _harvest_metrics( self, - ) -> Tuple[Dict[str, List[Any]], List[Any]]: + ) -> Dict[str, List[Any]]: """Returns (metrics_by_name, resource_metrics). metrics_by_name maps metric name to list of data points. resource_metrics is the raw ResourceMetrics list for scope-level assertions (e.g. schema_url). """ - try: - self.meter_provider.force_flush() - except Exception: # pylint: disable=broad-except - assert False, "force_flush raised an exception" - self.metric_reader.collect() + metrics = self.get_sorted_metrics(SCOPE) metrics_by_name: Dict[str, List[Any]] = {} - metrics_data = self.metric_reader.get_metrics_data() - resource_metrics = ( - metrics_data and metrics_data.resource_metrics - ) or [] - for resource_metric in resource_metrics: - for scope_metric in resource_metric.scope_metrics or []: - for metric in scope_metric.metrics or []: - points = metric.data.data_points or [] - metrics_by_name.setdefault(metric.name, []).extend(points) - return metrics_by_name, resource_metrics + for metric in metrics or []: + points = metric.data.data_points or [] + metrics_by_name.setdefault(metric.name, []).extend(points) + return metrics_by_name def _assert_metric_scope_schema_urls( - self, resource_metrics: List[Any], expected_schema_url: str + self, expected_schema_url: str ) -> None: - for resource_metric in resource_metrics: + for ( + resource_metric + ) in self.memory_metrics_reader.get_metrics_data().resource_metrics: for scope_metric in resource_metric.scope_metrics: + if scope_metric.scope.name != SCOPE: + continue self.assertEqual( scope_metric.scope.schema_url, expected_schema_url )