Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions ddtrace/contrib/internal/grpc/client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.grpc import constants
from ddtrace.contrib.internal.grpc import utils
from ddtrace.contrib.internal.grpc.utils import is_otlp_export
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
Expand Down Expand Up @@ -193,6 +194,13 @@ def __init__(self, pin, host, port):
self._port = port

def _intercept_client_call(self, method_kind, client_call_details):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)

if is_otlp_export(metadata):
return None, client_call_details

tracer: Tracer = self._pin.tracer

# Instead of using .trace, create the span and activate it at points where we call the continuations
Expand Down Expand Up @@ -228,9 +236,6 @@ def _intercept_client_call(self, method_kind, client_call_details):
# NOTE: We need to pass the span to the HTTPPropagator since it isn't active at this point
HTTPPropagator.inject(span.context, headers, span)

metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.extend(headers.items())

client_call_details = _ClientCallDetails(
Expand All @@ -247,6 +252,8 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
constants.GRPC_METHOD_KIND_UNARY,
client_call_details,
)
if span is None:
return continuation(client_call_details, request)
with _activated_span(self._pin.tracer, span):
try:
response = continuation(client_call_details, request)
Expand All @@ -265,6 +272,8 @@ def intercept_unary_stream(self, continuation, client_call_details, request):
constants.GRPC_METHOD_KIND_SERVER_STREAMING,
client_call_details,
)
if span is None:
return continuation(client_call_details, request)
with _activated_span(self._pin.tracer, span):
response_iterator = continuation(client_call_details, request)
response_iterator = _WrappedResponseCallFuture(response_iterator, span, self._pin.tracer)
Expand All @@ -275,6 +284,8 @@ def intercept_stream_unary(self, continuation, client_call_details, request_iter
constants.GRPC_METHOD_KIND_CLIENT_STREAMING,
client_call_details,
)
if span is None:
return continuation(client_call_details, request_iterator)
with _activated_span(self._pin.tracer, span):
try:
response = continuation(client_call_details, request_iterator)
Expand All @@ -293,6 +304,8 @@ def intercept_stream_stream(self, continuation, client_call_details, request_ite
constants.GRPC_METHOD_KIND_BIDI_STREAMING,
client_call_details,
)
if span is None:
return continuation(client_call_details, request_iterator)
with _activated_span(self._pin.tracer, span):
response_iterator = continuation(client_call_details, request_iterator)
response_iterator = _WrappedResponseCallFuture(response_iterator, span, self._pin.tracer)
Expand Down
1 change: 1 addition & 0 deletions ddtrace/contrib/internal/grpc/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
GRPC_AIO_SERVICE_SERVER = "grpc-aio-server"
GRPC_SERVICE_CLIENT = "grpc-client"
GRPC_AIO_SERVICE_CLIENT = "grpc-aio-client"
USER_AGENT_HEADER = "user-agent"
19 changes: 19 additions & 0 deletions ddtrace/contrib/internal/grpc/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import ipaddress
import logging
import re
from typing import Tuple
from urllib import parse

from ddtrace import config
from ddtrace.contrib.internal.grpc import constants
from ddtrace.contrib.internal.grpc.constants import USER_AGENT_HEADER
from ddtrace.ext import net
from ddtrace.internal.opentelemetry.constants import OTLP_EXPORTER_HEADER_IDENTIFIER


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -108,3 +112,18 @@ def _parse_rpc_repr_string(rpc_string, module):

# Return the status code and details
return code, details


def is_otlp_export(metadata: Tuple) -> bool:
"""
Determine if a gRPC channel is submitting data to the OpenTelemetry OTLP exporter.
"""
if not (config._otel_logs_enabled or config._otel_metrics_enabled):
return False

for key, value in metadata:
if key == USER_AGENT_HEADER:
normalized_value = value.lower().replace(" ", "-")
if OTLP_EXPORTER_HEADER_IDENTIFIER in normalized_value:
return True
return False
17 changes: 16 additions & 1 deletion ddtrace/contrib/internal/requests/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
from typing import Optional # noqa:F401
from urllib import parse

import requests

import ddtrace
from ddtrace import config
from ddtrace._trace.pin import Pin
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.requests.constants import USER_AGENT_HEADER
from ddtrace.contrib.internal.trace_utils import _sanitized_url
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.logger import get_logger
from ddtrace.internal.opentelemetry.constants import OTLP_EXPORTER_HEADER_IDENTIFIER
from ddtrace.internal.schema import schematize_url_operation
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
from ddtrace.internal.utils import get_argument_value
Expand All @@ -24,6 +28,17 @@
log = get_logger(__name__)


def is_otlp_export(request: requests.models.Request) -> bool:
"""Determine if a request is submitting data to the OpenTelemetry OTLP exporter."""
if not (config._otel_logs_enabled or config._otel_metrics_enabled):
return False
user_agent = request.headers.get(USER_AGENT_HEADER, "")
normalized_user_agent = user_agent.lower().replace(" ", "-")
if OTLP_EXPORTER_HEADER_IDENTIFIER in normalized_user_agent:
return True
return False


def _extract_hostname_and_path(uri):
# type: (str) -> str
parsed_uri = parse.urlparse(uri)
Expand Down Expand Up @@ -67,7 +82,7 @@ def _wrap_send(func, instance, args, kwargs):
return func(*args, **kwargs)

request = get_argument_value(args, kwargs, 0, "request")
if not request:
if not request or is_otlp_export(request):
return func(*args, **kwargs)

url = _sanitized_url(request.url)
Expand Down
1 change: 1 addition & 0 deletions ddtrace/contrib/internal/requests/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DEFAULT_SERVICE = "requests"
USER_AGENT_HEADER = "User-Agent"
1 change: 1 addition & 0 deletions ddtrace/internal/opentelemetry/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OTLP_EXPORTER_HEADER_IDENTIFIER = "otel-otlp-exporter-python"
5 changes: 5 additions & 0 deletions releasenotes/notes/otel-traced-export-445b99748c059bea.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
otel: Prevents OpenTelemetry OTLP exporter connections from being traced by ddtrace.
ddtrace internal connections (gRPC and HTTP) are now excluded from tracing to prevent circular instrumentation.
59 changes: 59 additions & 0 deletions tests/opentelemetry/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,62 @@ def test_otel_trace_log_correlation():
assert (
int(attributes["span_id"], 16) == span_context.span_id
), f"Expected span_id_hex to be set to {attributes['span_id']} but found: {span_context.span_id}"


@pytest.mark.skipif(
EXPORTER_VERSION < MINIMUM_SUPPORTED_VERSION,
reason=f"OpenTelemetry exporter version {MINIMUM_SUPPORTED_VERSION} is required to export logs",
)
@pytest.mark.snapshot()
@pytest.mark.subprocess(ddtrace_run=True, env={"DD_LOGS_OTEL_ENABLED": "true"})
def test_otel_logs_does_not_generate_client_grpc_spans():
"""
Test that OpenTelemetry grpc logs exporter does not generate client grpc spans.
"""
from logging import getLogger

from opentelemetry._logs import get_logger_provider

from tests.opentelemetry.test_logs import create_mock_grpc_server

logger = getLogger()
mock_service, server = create_mock_grpc_server()

try:
server.start()
logger.error("test_otel_logs_grpc")
get_logger_provider().force_flush()
finally:
server.stop(0)

assert mock_service.received_requests, "Expected gRPC log export requests but received none"


@pytest.mark.skipif(
EXPORTER_VERSION < MINIMUM_SUPPORTED_VERSION,
reason=f"OpenTelemetry exporter version {MINIMUM_SUPPORTED_VERSION} is required to export logs",
)
@pytest.mark.snapshot()
@pytest.mark.subprocess(
ddtrace_run=True, env={"DD_LOGS_OTEL_ENABLED": "true", "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf"}
)
def test_otel_logs_does_not_generate_client_http_spans():
"""Test that OpenTelemetry http logs exporter does not generate client spans."""
from logging import getLogger
from unittest.mock import Mock
from unittest.mock import patch

from opentelemetry._logs import get_logger_provider

logger = getLogger()
with patch("requests.sessions.Session.request") as mock_request:
mock_request.return_value = Mock(status_code=200)

logger.error("test_otel_logs_http")
get_logger_provider().force_flush()

log_request_found = any(
len(call[0]) >= 2 and call[0][0] == "POST" and "/v1/logs" in call[0][1]
for call in mock_request.call_args_list
)
assert log_request_found, f"Expected HTTP log export request but found none: {mock_request.call_args_list}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[[
{
"name": "grpc",
"service": "grpc-server",
"resource": "/opentelemetry.proto.collector.logs.v1.LogsService/Export",
"trace_id": 0,
"span_id": 1,
"parent_id": 0,
"type": "grpc",
"error": 0,
"meta": {
"_dd.base_service": "ddtrace_subprocess_dir",
"_dd.p.dm": "-0",
"_dd.p.tid": "68f8d7df00000000",
"component": "grpc_server",
"grpc.method.kind": "unary",
"grpc.method.name": "Export",
"grpc.method.package": "opentelemetry.proto.collector.logs.v1",
"grpc.method.path": "/opentelemetry.proto.collector.logs.v1.LogsService/Export",
"grpc.method.service": "LogsService",
"language": "python",
"rpc.service": "opentelemetry.proto.collector.logs.v1.LogsService",
"runtime-id": "f889b527dbdc40afa921bb54a08f8c90",
"span.kind": "server"
},
"metrics": {
"_dd.measured": 1,
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"process_id": 32526
},
"duration": 31000,
"start": 1761138655401326000
}]]
Loading