From 3a26bfe8cad8e2f69e91b48c555b31762bf0d4f9 Mon Sep 17 00:00:00 2001 From: jonahgabriel Date: Tue, 14 Apr 2026 06:37:44 -0400 Subject: [PATCH 1/2] feat(observability): wire Phoenix OTLP tracing for local LLM routes [OMN-8697] - Add OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006 and OTEL_TRACES_EXPORTER=otlp to x-runtime-env in docker-compose.infra.yml so all runtime containers export spans to Phoenix without needing the tracing bundle to be explicitly selected. - Include the tracing bundle in runtime bundle includes (bundles.yaml) so catalog-driven deploys also wire Phoenix and inject OTEL env vars. - Instrument _execute_llm_http_call in MixinLlmHttpTransport with a gen_ai.{chat,text_completion} OpenTelemetry span that records: gen_ai.request.model, gen_ai.system, gen_ai.operation.name, server.address, omninode.correlation_id, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens (from response usage field). - Add TestOtelLlmSpan (4 tests) to verify span creation, token count extraction, text_completion vs chat detection, and span-on-error. --- docker/catalog/bundles.yaml | 1 + docker/docker-compose.infra.yml | 8 +- .../mixins/mixin_llm_http_transport.py | 401 ++++++++++-------- .../mixins/test_mixin_llm_http_transport.py | 143 +++++++ 4 files changed, 363 insertions(+), 190 deletions(-) diff --git a/docker/catalog/bundles.yaml b/docker/catalog/bundles.yaml index f63f46376d..d4b9543542 100644 --- a/docker/catalog/bundles.yaml +++ b/docker/catalog/bundles.yaml @@ -43,6 +43,7 @@ runtime: - autoheal includes: - core + - tracing # OMN-8697: always wire Phoenix OTLP tracing with the runtime observability: description: "LLM observability — Phoenix traces and evals" services: diff --git a/docker/docker-compose.infra.yml b/docker/docker-compose.infra.yml index edd31c0ffb..7a5050952d 100644 --- a/docker/docker-compose.infra.yml +++ b/docker/docker-compose.infra.yml @@ -211,9 +211,11 @@ x-runtime-env: &runtime-env LLM_CODER_FAST_URL: ${LLM_CODER_FAST_URL:?LLM_CODER_FAST_URL required for PluginLlm activation} LLM_EMBEDDING_URL: ${LLM_EMBEDDING_URL:?LLM_EMBEDDING_URL required for PluginLlm activation} LLM_DEEPSEEK_R1_URL: ${LLM_DEEPSEEK_R1_URL:?LLM_DEEPSEEK_R1_URL required for PluginLlm activation} - # --- OpenTelemetry (OMN-5382: removed from base env — injected per-bundle) --- - # OTEL vars are only present when the tracing bundle is selected. - # OTEL_SERVICE_NAME is set per-service (already done in service blocks). + # --- OpenTelemetry (OMN-8697: wired to Phoenix for LLM span visibility) --- + # Phoenix runs alongside the runtime; all containers export spans via OTLP HTTP. + # OTEL_SERVICE_NAME is set per-service (see individual service blocks below). + OTEL_EXPORTER_OTLP_ENDPOINT: "http://phoenix:6006" + OTEL_TRACES_EXPORTER: "otlp" # ========================================================================== # Services # ========================================================================== diff --git a/src/omnibase_infra/mixins/mixin_llm_http_transport.py b/src/omnibase_infra/mixins/mixin_llm_http_transport.py index 726ced5921..74b3de66be 100644 --- a/src/omnibase_infra/mixins/mixin_llm_http_transport.py +++ b/src/omnibase_infra/mixins/mixin_llm_http_transport.py @@ -656,47 +656,145 @@ async def _execute_llm_http_call( client = await self._get_http_client() - while retry_state.is_retriable(): - try: - # Check circuit breaker - await self._check_circuit_if_enabled(operation, correlation_id) - - # Make HTTP POST with HMAC signature header - # headers dict intentionally overrides any default HMAC_HEADER key on the client. - response = await client.post( - url, - json=payload, - headers={self.HMAC_HEADER: hmac_signature}, - timeout=effective_timeout, - ) + # ── OpenTelemetry LLM span (OMN-8697) ──────────────────────────────── + # Wraps the full retry loop so Phoenix captures model, total latency, + # and token counts from the final successful response. + from opentelemetry import trace as _otel_trace + + _gen_ai_op = "chat" if "messages" in payload else "text_completion" + with _otel_trace.get_tracer("omnibase_infra.llm").start_as_current_span( + f"gen_ai.{_gen_ai_op}", + attributes={ + "gen_ai.system": "openai", + "gen_ai.operation.name": _gen_ai_op, + "gen_ai.request.model": str(payload.get("model", "unknown")), + "server.address": url, + "omninode.correlation_id": str(correlation_id), + "omninode.target": self._llm_target_name, + }, + ) as _span: + while retry_state.is_retriable(): + try: + # Check circuit breaker + await self._check_circuit_if_enabled(operation, correlation_id) + + # Make HTTP POST with HMAC signature header + # headers dict intentionally overrides any default HMAC_HEADER key on the client. + response = await client.post( + url, + json=payload, + headers={self.HMAC_HEADER: hmac_signature}, + timeout=effective_timeout, + ) - # Handle non-2xx responses - if response.status_code < 200 or response.status_code >= 300: - error = self._map_http_status_to_error(response, correlation_id) + # Handle non-2xx responses + if response.status_code < 200 or response.status_code >= 300: + error = self._map_http_status_to_error(response, correlation_id) - # Special 429 handling: use Retry-After as delay - if response.status_code == 429: - retry_after = self._parse_retry_after(response) + # Special 429 handling: use Retry-After as delay + if response.status_code == 429: + retry_after = self._parse_retry_after(response) + next_state = retry_state.next_attempt( + error_message=f"Rate limited (429), retry after {retry_after}s", + ) + if next_state.is_retriable(): + logger.debug( + "Rate limited, waiting before retry", + extra={ + "retry_after_seconds": retry_after, + "attempt": next_state.attempt, + "max_attempts": next_state.max_attempts, + "correlation_id": str(correlation_id), + "target": self._llm_target_name, + }, + ) + await asyncio.sleep(retry_after) + retry_state = next_state + continue + raise error + + classification = self._classify_error(error, operation) + if classification.record_circuit_failure: + await self._record_circuit_failure_if_enabled( + operation, correlation_id + ) next_state = retry_state.next_attempt( - error_message=f"Rate limited (429), retry after {retry_after}s", + error_message=classification.error_message, ) - if next_state.is_retriable(): + if classification.should_retry and next_state.is_retriable(): + retry_state = next_state logger.debug( - "Rate limited, waiting before retry", + "Retrying after HTTP error", extra={ - "retry_after_seconds": retry_after, - "attempt": next_state.attempt, - "max_attempts": next_state.max_attempts, + "status_code": response.status_code, + "attempt": retry_state.attempt, + "max_attempts": retry_state.max_attempts, + "delay_seconds": retry_state.delay_seconds, "correlation_id": str(correlation_id), "target": self._llm_target_name, }, ) - await asyncio.sleep(retry_after) - retry_state = next_state + await asyncio.sleep(retry_state.delay_seconds) continue raise error - classification = self._classify_error(error, operation) + # 2xx response: validate content-type + # Only reject when a non-JSON content-type is explicitly present. + # Missing/empty content-type falls through to JSON parsing which + # will raise InfraProtocolError on its own if the body is invalid. + content_type = response.headers.get("content-type", "") + if content_type and "json" not in content_type.lower(): + await self._record_circuit_failure_if_enabled( + operation, correlation_id + ) + body_snippet = ( + sanitize_error_string(response.text) + if response.text + else "" + ) + ctx = self._build_error_context(operation, correlation_id) + raise InfraProtocolError( + f"Expected JSON response from {self._llm_target_name}, " + f"got content-type: {content_type}", + context=ctx, + status_code=response.status_code, + content_type=content_type, + response_body=body_snippet, + ) + + # Parse JSON + try: + data = cast("dict[str, JsonType]", response.json()) + except (JSONDecodeError, ValueError) as exc: + await self._record_circuit_failure_if_enabled( + operation, correlation_id + ) + body_snippet = ( + sanitize_error_string(response.text) + if response.text + else "" + ) + ctx = self._build_error_context(operation, correlation_id) + raise InfraProtocolError( + f"Failed to parse JSON response from {self._llm_target_name}: {exc}", + context=ctx, + status_code=response.status_code, + content_type=content_type, + response_body=body_snippet, + ) from exc + + # Success - reset circuit breaker and record token counts + await self._reset_circuit_if_enabled() + _usage = data.get("usage") or {} + if isinstance(_usage, dict): + if (_pt := _usage.get("prompt_tokens")) is not None: + _span.set_attribute("gen_ai.usage.input_tokens", int(_pt)) + if (_ct := _usage.get("completion_tokens")) is not None: + _span.set_attribute("gen_ai.usage.output_tokens", int(_ct)) + return data + + except httpx.ConnectError as exc: + classification = self._classify_error(exc, operation) if classification.record_circuit_failure: await self._record_circuit_failure_if_enabled( operation, correlation_id @@ -704,12 +802,11 @@ async def _execute_llm_http_call( next_state = retry_state.next_attempt( error_message=classification.error_message, ) - if classification.should_retry and next_state.is_retriable(): + if next_state.is_retriable(): retry_state = next_state logger.debug( - "Retrying after HTTP error", + "Retrying after connection error", extra={ - "status_code": response.status_code, "attempt": retry_state.attempt, "max_attempts": retry_state.max_attempts, "delay_seconds": retry_state.delay_seconds, @@ -719,171 +816,101 @@ async def _execute_llm_http_call( ) await asyncio.sleep(retry_state.delay_seconds) continue - raise error - - # 2xx response: validate content-type - # Only reject when a non-JSON content-type is explicitly present. - # Missing/empty content-type falls through to JSON parsing which - # will raise InfraProtocolError on its own if the body is invalid. - content_type = response.headers.get("content-type", "") - if content_type and "json" not in content_type.lower(): - await self._record_circuit_failure_if_enabled( - operation, correlation_id - ) - body_snippet = ( - sanitize_error_string(response.text) if response.text else "" - ) ctx = self._build_error_context(operation, correlation_id) - raise InfraProtocolError( - f"Expected JSON response from {self._llm_target_name}, " - f"got content-type: {content_type}", + raise InfraConnectionError( + f"Connection to {self._llm_target_name} failed after " + f"{retry_state.attempt + 1} attempts: {exc}", context=ctx, - status_code=response.status_code, - content_type=content_type, - response_body=body_snippet, - ) - - # Parse JSON - try: - data = cast("dict[str, JsonType]", response.json()) - except (JSONDecodeError, ValueError) as exc: - await self._record_circuit_failure_if_enabled( - operation, correlation_id - ) - body_snippet = ( - sanitize_error_string(response.text) if response.text else "" - ) - ctx = self._build_error_context(operation, correlation_id) - raise InfraProtocolError( - f"Failed to parse JSON response from {self._llm_target_name}: {exc}", - context=ctx, - status_code=response.status_code, - content_type=content_type, - response_body=body_snippet, ) from exc - # Success - reset circuit breaker - await self._reset_circuit_if_enabled() - return data - - except httpx.ConnectError as exc: - classification = self._classify_error(exc, operation) - if classification.record_circuit_failure: - await self._record_circuit_failure_if_enabled( - operation, correlation_id - ) - next_state = retry_state.next_attempt( - error_message=classification.error_message, - ) - if next_state.is_retriable(): - retry_state = next_state - logger.debug( - "Retrying after connection error", - extra={ - "attempt": retry_state.attempt, - "max_attempts": retry_state.max_attempts, - "delay_seconds": retry_state.delay_seconds, - "correlation_id": str(correlation_id), - "target": self._llm_target_name, - }, - ) - await asyncio.sleep(retry_state.delay_seconds) - continue - ctx = self._build_error_context(operation, correlation_id) - raise InfraConnectionError( - f"Connection to {self._llm_target_name} failed after " - f"{retry_state.attempt + 1} attempts: {exc}", - context=ctx, - ) from exc - - except httpx.TimeoutException as exc: - classification = self._classify_error(exc, operation) - if classification.record_circuit_failure: - await self._record_circuit_failure_if_enabled( - operation, correlation_id + except httpx.TimeoutException as exc: + classification = self._classify_error(exc, operation) + if classification.record_circuit_failure: + await self._record_circuit_failure_if_enabled( + operation, correlation_id + ) + next_state = retry_state.next_attempt( + error_message=classification.error_message, ) - next_state = retry_state.next_attempt( - error_message=classification.error_message, - ) - if next_state.is_retriable(): - retry_state = next_state - logger.debug( - "Retrying after timeout", - extra={ - "attempt": retry_state.attempt, - "max_attempts": retry_state.max_attempts, - "delay_seconds": retry_state.delay_seconds, - "timeout_seconds": effective_timeout, - "correlation_id": str(correlation_id), - "target": self._llm_target_name, - }, + if next_state.is_retriable(): + retry_state = next_state + logger.debug( + "Retrying after timeout", + extra={ + "attempt": retry_state.attempt, + "max_attempts": retry_state.max_attempts, + "delay_seconds": retry_state.delay_seconds, + "timeout_seconds": effective_timeout, + "correlation_id": str(correlation_id), + "target": self._llm_target_name, + }, + ) + await asyncio.sleep(retry_state.delay_seconds) + continue + timeout_ctx = ModelTimeoutErrorContext( + transport_type=EnumInfraTransportType.HTTP, + operation=operation, + target_name=self._llm_target_name, + correlation_id=correlation_id, + timeout_seconds=effective_timeout, ) - await asyncio.sleep(retry_state.delay_seconds) - continue - timeout_ctx = ModelTimeoutErrorContext( - transport_type=EnumInfraTransportType.HTTP, - operation=operation, - target_name=self._llm_target_name, - correlation_id=correlation_id, - timeout_seconds=effective_timeout, - ) - raise InfraTimeoutError( - f"Request to {self._llm_target_name} timed out after " - f"{effective_timeout}s ({retry_state.attempt + 1} attempts)", - context=timeout_ctx, - ) from exc + raise InfraTimeoutError( + f"Request to {self._llm_target_name} timed out after " + f"{effective_timeout}s ({retry_state.attempt + 1} attempts)", + context=timeout_ctx, + ) from exc - except ( - InfraRateLimitedError, - InfraRequestRejectedError, - InfraAuthenticationError, - ProtocolConfigurationError, - InfraProtocolError, - ): - raise # Already typed, don't wrap - - except InfraUnavailableError: - raise # Already handled (e.g., circuit breaker open) - - except Exception as exc: - # Unexpected error - classify and handle - classification = self._classify_error(exc, operation) - if classification.record_circuit_failure: - await self._record_circuit_failure_if_enabled( - operation, correlation_id - ) - next_state = retry_state.next_attempt( - error_message=classification.error_message, - ) - if classification.should_retry and next_state.is_retriable(): - retry_state = next_state - logger.debug( - "Retrying after unexpected error", - extra={ - "error_type": type(exc).__name__, - "attempt": retry_state.attempt, - "max_attempts": retry_state.max_attempts, - "delay_seconds": retry_state.delay_seconds, - "correlation_id": str(correlation_id), - "target": self._llm_target_name, - }, + except ( + InfraRateLimitedError, + InfraRequestRejectedError, + InfraAuthenticationError, + ProtocolConfigurationError, + InfraProtocolError, + ): + raise # Already typed, don't wrap + + except InfraUnavailableError: + raise # Already handled (e.g., circuit breaker open) + + except Exception as exc: + # Unexpected error - classify and handle + classification = self._classify_error(exc, operation) + if classification.record_circuit_failure: + await self._record_circuit_failure_if_enabled( + operation, correlation_id + ) + next_state = retry_state.next_attempt( + error_message=classification.error_message, ) - await asyncio.sleep(retry_state.delay_seconds) - continue - ctx = self._build_error_context(operation, correlation_id) - raise InfraConnectionError( - f"Unexpected error calling {self._llm_target_name}: " - f"{type(exc).__name__}: {exc}", - context=ctx, - ) from exc + if classification.should_retry and next_state.is_retriable(): + retry_state = next_state + logger.debug( + "Retrying after unexpected error", + extra={ + "error_type": type(exc).__name__, + "attempt": retry_state.attempt, + "max_attempts": retry_state.max_attempts, + "delay_seconds": retry_state.delay_seconds, + "correlation_id": str(correlation_id), + "target": self._llm_target_name, + }, + ) + await asyncio.sleep(retry_state.delay_seconds) + continue + ctx = self._build_error_context(operation, correlation_id) + raise InfraConnectionError( + f"Unexpected error calling {self._llm_target_name}: " + f"{type(exc).__name__}: {exc}", + context=ctx, + ) from exc - # Loop exited without return or raise - all retries exhausted - ctx = self._build_error_context(operation, correlation_id) - raise InfraUnavailableError( - f"All retry attempts exhausted for {self._llm_target_name} " - f"({total_attempts} attempts)", - context=ctx, - ) + # Loop exited without return or raise - all retries exhausted + ctx = self._build_error_context(operation, correlation_id) + raise InfraUnavailableError( + f"All retry attempts exhausted for {self._llm_target_name} " + f"({total_attempts} attempts)", + context=ctx, + ) # ── HTTP status to error mapping ───────────────────────────────────── diff --git a/tests/unit/mixins/test_mixin_llm_http_transport.py b/tests/unit/mixins/test_mixin_llm_http_transport.py index f478745f31..6dce66f24c 100644 --- a/tests/unit/mixins/test_mixin_llm_http_transport.py +++ b/tests/unit/mixins/test_mixin_llm_http_transport.py @@ -3,6 +3,7 @@ """Unit tests for MixinLlmHttpTransport. This test suite validates: +- OpenTelemetry LLM span creation (OMN-8697) - HTTP status code to typed exception mapping (401, 403, 404, 429, 400, 422, 500-504) - 429 does NOT increment circuit breaker failure count - Retry-After header parsing (present, absent, unparseable, capped) @@ -1877,3 +1878,145 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert result == {"result": "ok"} + + +# ── OpenTelemetry LLM Span (OMN-8697) ──────────────────────────────────── + + +@pytest.mark.unit +class TestOtelLlmSpan: + """Verify that _execute_llm_http_call emits a gen_ai span to the active tracer. + + Uses InMemorySpanExporter so we don't need a running Phoenix instance. + The global TracerProvider is restored after each test to avoid cross-test + pollution. + """ + + @pytest.fixture(autouse=True) + def _install_in_memory_tracer(self) -> Generator[None, None, None]: + """Patch opentelemetry.trace.get_tracer to return a per-test in-memory tracer. + + OTel only allows set_tracer_provider once per process. Instead, we patch + get_tracer directly so each test gets an isolated InMemorySpanExporter + without touching the global TracerProvider. + """ + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + self._exporter = InMemorySpanExporter() + _provider = TracerProvider(resource=Resource.create({"service.name": "test"})) + _provider.add_span_processor(SimpleSpanProcessor(self._exporter)) + _tracer = _provider.get_tracer("omnibase_infra.llm") + + with patch("opentelemetry.trace.get_tracer", return_value=_tracer): + yield + + async def test_span_created_on_success(self, correlation_id: UUID) -> None: + """A gen_ai span with model + server attributes is emitted on a successful call.""" + response_body = { + "choices": [{"message": {"content": "pong"}}], + "usage": {"prompt_tokens": 10, "completion_tokens": 5}, + } + + def handler(request: httpx.Request) -> httpx.Response: + return _json_response(response_body) + + client = _make_mock_client(handler) + harness = LlmTransportHarness(http_client=client) + payload = { + "model": "qwen3-coder", + "messages": [{"role": "user", "content": "ping"}], + } + + await harness._execute_llm_http_call( + url=URL, + payload=payload, + correlation_id=correlation_id, + ) + + spans = self._exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "gen_ai.chat" + assert span.attributes["gen_ai.request.model"] == "qwen3-coder" + assert span.attributes["gen_ai.system"] == "openai" + assert span.attributes["gen_ai.operation.name"] == "chat" + assert span.attributes["server.address"] == URL + + async def test_span_records_token_counts(self, correlation_id: UUID) -> None: + """Token counts from the response usage field are set as span attributes.""" + response_body = { + "choices": [], + "usage": {"prompt_tokens": 42, "completion_tokens": 17}, + } + + def handler(request: httpx.Request) -> httpx.Response: + return _json_response(response_body) + + client = _make_mock_client(handler) + harness = LlmTransportHarness(http_client=client) + payload = { + "model": "deepseek-r1", + "messages": [{"role": "user", "content": "hi"}], + } + + await harness._execute_llm_http_call( + url=URL, + payload=payload, + correlation_id=correlation_id, + ) + + spans = self._exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes.get("gen_ai.usage.input_tokens") == 42 + assert span.attributes.get("gen_ai.usage.output_tokens") == 17 + + async def test_span_uses_text_completion_op_without_messages( + self, correlation_id: UUID + ) -> None: + """Payload without 'messages' key results in 'text_completion' operation name.""" + + def handler(request: httpx.Request) -> httpx.Response: + return _json_response({"result": "ok"}) + + client = _make_mock_client(handler) + harness = LlmTransportHarness(http_client=client) + payload = {"model": "qwen3-coder", "prompt": "Say hello"} + + await harness._execute_llm_http_call( + url=URL, + payload=payload, + correlation_id=correlation_id, + ) + + spans = self._exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "gen_ai.text_completion" + assert span.attributes["gen_ai.operation.name"] == "text_completion" + + async def test_span_emitted_even_on_error(self, correlation_id: UUID) -> None: + """A span is still finished (and recorded as error) when the call fails.""" + + def handler(request: httpx.Request) -> httpx.Response: + return _json_response({"error": "bad request"}, status_code=400) + + client = _make_mock_client(handler) + harness = LlmTransportHarness(http_client=client, cb_threshold=10) + payload = {"model": "qwen3-coder", "messages": []} + + with pytest.raises(InfraRequestRejectedError): + await harness._execute_llm_http_call( + url=URL, + payload=payload, + correlation_id=correlation_id, + max_retries=0, + ) + + spans = self._exporter.get_finished_spans() + assert len(spans) == 1 From b31144387bbf9c422a873d92fac76cf63a3786af Mon Sep 17 00:00:00 2001 From: jonahgabriel Date: Tue, 14 Apr 2026 06:41:02 -0400 Subject: [PATCH 2/2] fix(observability): guard token count cast with isinstance to satisfy mypy [OMN-8697] --- src/omnibase_infra/mixins/mixin_llm_http_transport.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/omnibase_infra/mixins/mixin_llm_http_transport.py b/src/omnibase_infra/mixins/mixin_llm_http_transport.py index 74b3de66be..0389f2e1c4 100644 --- a/src/omnibase_infra/mixins/mixin_llm_http_transport.py +++ b/src/omnibase_infra/mixins/mixin_llm_http_transport.py @@ -787,9 +787,11 @@ async def _execute_llm_http_call( await self._reset_circuit_if_enabled() _usage = data.get("usage") or {} if isinstance(_usage, dict): - if (_pt := _usage.get("prompt_tokens")) is not None: + _pt = _usage.get("prompt_tokens") + if isinstance(_pt, (int, float)): _span.set_attribute("gen_ai.usage.input_tokens", int(_pt)) - if (_ct := _usage.get("completion_tokens")) is not None: + _ct = _usage.get("completion_tokens") + if isinstance(_ct, (int, float)): _span.set_attribute("gen_ai.usage.output_tokens", int(_ct)) return data