Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 22 additions & 50 deletions util/opentelemetry-util-genai/tests/test_handler_metrics.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,22 @@
from __future__ import annotations

from typing import Any, Dict, List, Tuple
from unittest import TestCase
from typing import Any, Dict, List
from unittest.mock import patch

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import Error, LLMInvocation

_DEFAULT_SCHEMA_URL = Schemas.V1_37_0.value

SCOPE = "opentelemetry.util.genai.handler"

class TelemetryHandlerMetricsTest(TestCase):
def setUp(self) -> None:
self.metric_reader = InMemoryMetricReader()
self.meter_provider = MeterProvider(
metric_readers=[self.metric_reader]
)
self.span_exporter = InMemorySpanExporter()
self.tracer_provider = TracerProvider()
self.tracer_provider.add_span_processor(
SimpleSpanProcessor(self.span_exporter)
)

class TelemetryHandlerMetricsTest(TestBase):
def test_stop_llm_records_duration_and_tokens(self) -> None:
handler = TelemetryHandler(
tracer_provider=self.tracer_provider,
Expand All @@ -52,10 +36,8 @@ def test_stop_llm_records_duration_and_tokens(self) -> None:
):
handler.stop_llm(invocation)

metrics, resource_metrics = self._harvest_metrics()
self._assert_metric_scope_schema_urls(
resource_metrics, _DEFAULT_SCHEMA_URL
)
self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL)
metrics = self._harvest_metrics()
self.assertIn("gen_ai.client.operation.duration", metrics)
duration_points = metrics["gen_ai.client.operation.duration"]
self.assertEqual(len(duration_points), 1)
Expand Down Expand Up @@ -110,10 +92,8 @@ def test_stop_llm_records_duration_and_tokens_with_additional_attributes(
invocation.attributes = {"should not be on metrics": "value"}
handler.stop_llm(invocation)

metrics, resource_metrics = self._harvest_metrics()
self._assert_metric_scope_schema_urls(
resource_metrics, _DEFAULT_SCHEMA_URL
)
self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL)
metrics = self._harvest_metrics()
self.assertIn("gen_ai.client.operation.duration", metrics)
duration_points = metrics["gen_ai.client.operation.duration"]
self.assertIn("gen_ai.client.token.usage", metrics)
Expand Down Expand Up @@ -148,10 +128,8 @@ def test_fail_llm_records_error_and_available_tokens(self) -> None:
):
handler.fail_llm(invocation, error)

metrics, resource_metrics = self._harvest_metrics()
self._assert_metric_scope_schema_urls(
resource_metrics, _DEFAULT_SCHEMA_URL
)
self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL)
metrics = self._harvest_metrics()
self.assertIn("gen_ai.client.operation.duration", metrics)
duration_points = metrics["gen_ai.client.operation.duration"]
self.assertEqual(len(duration_points), 1)
Expand All @@ -177,35 +155,29 @@ def test_fail_llm_records_error_and_available_tokens(self) -> None:

def _harvest_metrics(
self,
) -> Tuple[Dict[str, List[Any]], List[Any]]:
) -> Dict[str, List[Any]]:
"""Returns (metrics_by_name, resource_metrics).

metrics_by_name maps metric name to list of data points.
resource_metrics is the raw ResourceMetrics list for scope-level
assertions (e.g. schema_url).
"""
try:
self.meter_provider.force_flush()
except Exception: # pylint: disable=broad-except
assert False, "force_flush raised an exception"
self.metric_reader.collect()
metrics = self.get_sorted_metrics(SCOPE)
metrics_by_name: Dict[str, List[Any]] = {}
metrics_data = self.metric_reader.get_metrics_data()
resource_metrics = (
metrics_data and metrics_data.resource_metrics
) or []
for resource_metric in resource_metrics:
for scope_metric in resource_metric.scope_metrics or []:
for metric in scope_metric.metrics or []:
points = metric.data.data_points or []
metrics_by_name.setdefault(metric.name, []).extend(points)
return metrics_by_name, resource_metrics
for metric in metrics or []:
points = metric.data.data_points or []
metrics_by_name.setdefault(metric.name, []).extend(points)
return metrics_by_name

def _assert_metric_scope_schema_urls(
self, resource_metrics: List[Any], expected_schema_url: str
self, expected_schema_url: str
) -> None:
for resource_metric in resource_metrics:
for (
resource_metric
) in self.memory_metrics_reader.get_metrics_data().resource_metrics:
for scope_metric in resource_metric.scope_metrics:
if scope_metric.scope.name != SCOPE:
continue
self.assertEqual(
scope_metric.scope.schema_url, expected_schema_url
)
Loading