diff --git a/ddtrace/contrib/internal/grpc/client_interceptor.py b/ddtrace/contrib/internal/grpc/client_interceptor.py index e55ea67fc50..0d6ddf227a1 100644 --- a/ddtrace/contrib/internal/grpc/client_interceptor.py +++ b/ddtrace/contrib/internal/grpc/client_interceptor.py @@ -13,6 +13,7 @@ from ddtrace.contrib import trace_utils from ddtrace.contrib.internal.grpc import constants from ddtrace.contrib.internal.grpc import utils +from ddtrace.contrib.internal.grpc.utils import is_otlp_export from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes from ddtrace.internal import core @@ -193,6 +194,13 @@ def __init__(self, pin, host, port): self._port = port def _intercept_client_call(self, method_kind, client_call_details): + metadata = [] + if client_call_details.metadata is not None: + metadata = list(client_call_details.metadata) + + if is_otlp_export(metadata): + return None, client_call_details + tracer: Tracer = self._pin.tracer # Instead of using .trace, create the span and activate it at points where we call the continuations @@ -228,9 +236,6 @@ def _intercept_client_call(self, method_kind, client_call_details): # NOTE: We need to pass the span to the HTTPPropagator since it isn't active at this point HTTPPropagator.inject(span.context, headers, span) - metadata = [] - if client_call_details.metadata is not None: - metadata = list(client_call_details.metadata) metadata.extend(headers.items()) client_call_details = _ClientCallDetails( @@ -247,6 +252,8 @@ def intercept_unary_unary(self, continuation, client_call_details, request): constants.GRPC_METHOD_KIND_UNARY, client_call_details, ) + if span is None: + return continuation(client_call_details, request) with _activated_span(self._pin.tracer, span): try: response = continuation(client_call_details, request) @@ -265,6 +272,8 @@ def intercept_unary_stream(self, continuation, client_call_details, request): constants.GRPC_METHOD_KIND_SERVER_STREAMING, client_call_details, ) + if span is None: + return continuation(client_call_details, request) with _activated_span(self._pin.tracer, span): response_iterator = continuation(client_call_details, request) response_iterator = _WrappedResponseCallFuture(response_iterator, span, self._pin.tracer) @@ -275,6 +284,8 @@ def intercept_stream_unary(self, continuation, client_call_details, request_iter constants.GRPC_METHOD_KIND_CLIENT_STREAMING, client_call_details, ) + if span is None: + return continuation(client_call_details, request_iterator) with _activated_span(self._pin.tracer, span): try: response = continuation(client_call_details, request_iterator) @@ -293,6 +304,8 @@ def intercept_stream_stream(self, continuation, client_call_details, request_ite constants.GRPC_METHOD_KIND_BIDI_STREAMING, client_call_details, ) + if span is None: + return continuation(client_call_details, request_iterator) with _activated_span(self._pin.tracer, span): response_iterator = continuation(client_call_details, request_iterator) response_iterator = _WrappedResponseCallFuture(response_iterator, span, self._pin.tracer) diff --git a/ddtrace/contrib/internal/grpc/constants.py b/ddtrace/contrib/internal/grpc/constants.py index 232c709de72..739651ac22f 100644 --- a/ddtrace/contrib/internal/grpc/constants.py +++ b/ddtrace/contrib/internal/grpc/constants.py @@ -24,3 +24,4 @@ GRPC_AIO_SERVICE_SERVER = "grpc-aio-server" GRPC_SERVICE_CLIENT = "grpc-client" GRPC_AIO_SERVICE_CLIENT = "grpc-aio-client" +USER_AGENT_HEADER = "user-agent" diff --git a/ddtrace/contrib/internal/grpc/utils.py b/ddtrace/contrib/internal/grpc/utils.py index 1f80b7875ba..4cc71b894d9 100644 --- a/ddtrace/contrib/internal/grpc/utils.py +++ b/ddtrace/contrib/internal/grpc/utils.py @@ -1,10 +1,14 @@ import ipaddress import logging import re +from typing import Tuple from urllib import parse +from ddtrace import config from ddtrace.contrib.internal.grpc import constants +from ddtrace.contrib.internal.grpc.constants import USER_AGENT_HEADER from ddtrace.ext import net +from ddtrace.internal.opentelemetry.constants import OTLP_EXPORTER_HEADER_IDENTIFIER log = logging.getLogger(__name__) @@ -108,3 +112,18 @@ def _parse_rpc_repr_string(rpc_string, module): # Return the status code and details return code, details + + +def is_otlp_export(metadata: Tuple) -> bool: + """ + Determine if a gRPC channel is submitting data to the OpenTelemetry OTLP exporter. + """ + if not (config._otel_logs_enabled or config._otel_metrics_enabled): + return False + + for key, value in metadata: + if key == USER_AGENT_HEADER: + normalized_value = value.lower().replace(" ", "-") + if OTLP_EXPORTER_HEADER_IDENTIFIER in normalized_value: + return True + return False diff --git a/ddtrace/contrib/internal/requests/connection.py b/ddtrace/contrib/internal/requests/connection.py index 4c01e02dfe5..4675aa16be9 100644 --- a/ddtrace/contrib/internal/requests/connection.py +++ b/ddtrace/contrib/internal/requests/connection.py @@ -3,17 +3,21 @@ from typing import Optional # noqa:F401 from urllib import parse +import requests + import ddtrace from ddtrace import config from ddtrace._trace.pin import Pin from ddtrace.constants import _SPAN_MEASURED_KEY from ddtrace.constants import SPAN_KIND from ddtrace.contrib import trace_utils +from ddtrace.contrib.internal.requests.constants import USER_AGENT_HEADER from ddtrace.contrib.internal.trace_utils import _sanitized_url from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes from ddtrace.internal.constants import COMPONENT from ddtrace.internal.logger import get_logger +from ddtrace.internal.opentelemetry.constants import OTLP_EXPORTER_HEADER_IDENTIFIER from ddtrace.internal.schema import schematize_url_operation from ddtrace.internal.schema.span_attribute_schema import SpanDirection from ddtrace.internal.utils import get_argument_value @@ -24,6 +28,17 @@ log = get_logger(__name__) +def is_otlp_export(request: requests.models.Request) -> bool: + """Determine if a request is submitting data to the OpenTelemetry OTLP exporter.""" + if not (config._otel_logs_enabled or config._otel_metrics_enabled): + return False + user_agent = request.headers.get(USER_AGENT_HEADER, "") + normalized_user_agent = user_agent.lower().replace(" ", "-") + if OTLP_EXPORTER_HEADER_IDENTIFIER in normalized_user_agent: + return True + return False + + def _extract_hostname_and_path(uri): # type: (str) -> str parsed_uri = parse.urlparse(uri) @@ -67,7 +82,7 @@ def _wrap_send(func, instance, args, kwargs): return func(*args, **kwargs) request = get_argument_value(args, kwargs, 0, "request") - if not request: + if not request or is_otlp_export(request): return func(*args, **kwargs) url = _sanitized_url(request.url) diff --git a/ddtrace/contrib/internal/requests/constants.py b/ddtrace/contrib/internal/requests/constants.py index c3f5eaca51f..386b7ecc790 100644 --- a/ddtrace/contrib/internal/requests/constants.py +++ b/ddtrace/contrib/internal/requests/constants.py @@ -1 +1,2 @@ DEFAULT_SERVICE = "requests" +USER_AGENT_HEADER = "User-Agent" diff --git a/ddtrace/internal/opentelemetry/constants.py b/ddtrace/internal/opentelemetry/constants.py new file mode 100644 index 00000000000..c3526cffdb9 --- /dev/null +++ b/ddtrace/internal/opentelemetry/constants.py @@ -0,0 +1 @@ +OTLP_EXPORTER_HEADER_IDENTIFIER = "otel-otlp-exporter-python" diff --git a/releasenotes/notes/otel-traced-export-445b99748c059bea.yaml b/releasenotes/notes/otel-traced-export-445b99748c059bea.yaml new file mode 100644 index 00000000000..a86720e4306 --- /dev/null +++ b/releasenotes/notes/otel-traced-export-445b99748c059bea.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + otel: Prevents OpenTelemetry OTLP exporter connections from being traced by ddtrace. + ddtrace internal connections (gRPC and HTTP) are now excluded from tracing to prevent circular instrumentation. \ No newline at end of file diff --git a/tests/opentelemetry/test_logs.py b/tests/opentelemetry/test_logs.py index 4c0968a82a0..db677f0b611 100644 --- a/tests/opentelemetry/test_logs.py +++ b/tests/opentelemetry/test_logs.py @@ -504,3 +504,62 @@ def test_otel_trace_log_correlation(): assert ( int(attributes["span_id"], 16) == span_context.span_id ), f"Expected span_id_hex to be set to {attributes['span_id']} but found: {span_context.span_id}" + + +@pytest.mark.skipif( + EXPORTER_VERSION < MINIMUM_SUPPORTED_VERSION, + reason=f"OpenTelemetry exporter version {MINIMUM_SUPPORTED_VERSION} is required to export logs", +) +@pytest.mark.snapshot() +@pytest.mark.subprocess(ddtrace_run=True, env={"DD_LOGS_OTEL_ENABLED": "true"}) +def test_otel_logs_does_not_generate_client_grpc_spans(): + """ + Test that OpenTelemetry grpc logs exporter does not generate client grpc spans. + """ + from logging import getLogger + + from opentelemetry._logs import get_logger_provider + + from tests.opentelemetry.test_logs import create_mock_grpc_server + + logger = getLogger() + mock_service, server = create_mock_grpc_server() + + try: + server.start() + logger.error("test_otel_logs_grpc") + get_logger_provider().force_flush() + finally: + server.stop(0) + + assert mock_service.received_requests, "Expected gRPC log export requests but received none" + + +@pytest.mark.skipif( + EXPORTER_VERSION < MINIMUM_SUPPORTED_VERSION, + reason=f"OpenTelemetry exporter version {MINIMUM_SUPPORTED_VERSION} is required to export logs", +) +@pytest.mark.snapshot() +@pytest.mark.subprocess( + ddtrace_run=True, env={"DD_LOGS_OTEL_ENABLED": "true", "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf"} +) +def test_otel_logs_does_not_generate_client_http_spans(): + """Test that OpenTelemetry http logs exporter does not generate client spans.""" + from logging import getLogger + from unittest.mock import Mock + from unittest.mock import patch + + from opentelemetry._logs import get_logger_provider + + logger = getLogger() + with patch("requests.sessions.Session.request") as mock_request: + mock_request.return_value = Mock(status_code=200) + + logger.error("test_otel_logs_http") + get_logger_provider().force_flush() + + log_request_found = any( + len(call[0]) >= 2 and call[0][0] == "POST" and "/v1/logs" in call[0][1] + for call in mock_request.call_args_list + ) + assert log_request_found, f"Expected HTTP log export request but found none: {mock_request.call_args_list}" diff --git a/tests/snapshots/tests.opentelemetry.test_logs.test_otel_logs_does_not_generate_client_grpc_spans.json b/tests/snapshots/tests.opentelemetry.test_logs.test_otel_logs_does_not_generate_client_grpc_spans.json new file mode 100644 index 00000000000..4b74013f32b --- /dev/null +++ b/tests/snapshots/tests.opentelemetry.test_logs.test_otel_logs_does_not_generate_client_grpc_spans.json @@ -0,0 +1,35 @@ +[[ + { + "name": "grpc", + "service": "grpc-server", + "resource": "/opentelemetry.proto.collector.logs.v1.LogsService/Export", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "grpc", + "error": 0, + "meta": { + "_dd.base_service": "ddtrace_subprocess_dir", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f8d7df00000000", + "component": "grpc_server", + "grpc.method.kind": "unary", + "grpc.method.name": "Export", + "grpc.method.package": "opentelemetry.proto.collector.logs.v1", + "grpc.method.path": "/opentelemetry.proto.collector.logs.v1.LogsService/Export", + "grpc.method.service": "LogsService", + "language": "python", + "rpc.service": "opentelemetry.proto.collector.logs.v1.LogsService", + "runtime-id": "f889b527dbdc40afa921bb54a08f8c90", + "span.kind": "server" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 32526 + }, + "duration": 31000, + "start": 1761138655401326000 + }]]