Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""OTLP Exporter"""
"""OTLP Exporter

This module provides a mixin class for OTLP exporters that send telemetry data
to an OTLP-compatible receiver via gRPC. It includes a configurable reconnection
logic to handle transient collector outages.


Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
"""

import random
import threading
Expand Down Expand Up @@ -251,20 +258,27 @@ def _get_credentials(
if certificate_file:
client_key_file = environ.get(client_key_file_env_key)
client_certificate_file = environ.get(client_certificate_file_env_key)
return _load_credentials(
credentials = _load_credentials(
certificate_file, client_key_file, client_certificate_file
)
if credentials is not None:
return credentials
return ssl_channel_credentials()


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
):
"""OTLP span exporter
"""OTLP gRPC exporter mixin.

This class provides the base functionality for OTLP exporters that send
telemetry data (spans or metrics) to an OTLP-compatible receiver via gRPC.
It includes a configurable reconnection mechanism to handle transient
receiver outages.

Args:
endpoint: OpenTelemetry Collector receiver endpoint
endpoint: OTLP-compatible receiver endpoint
insecure: Connection type
credentials: ChannelCredentials object for server authentication
headers: Headers to send when exporting
Expand Down Expand Up @@ -308,6 +322,8 @@ def __init__(
if parsed_url.netloc:
self._endpoint = parsed_url.netloc

self._insecure = insecure
self._credentials = credentials
self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
if isinstance(self._headers, str):
temp_headers = parse_env_headers(self._headers, liberal=True)
Expand Down Expand Up @@ -336,37 +352,51 @@ def __init__(
)
self._collector_kwargs = None

compression = (
self._compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
if compression is None
else compression
) or Compression.NoCompression

if insecure:
self._channel = insecure_channel(
self._endpoint,
compression=compression,
options=self._channel_options,
)
else:
self._channel = None
self._client = None

self._shutdown_in_progress = threading.Event()
self._shutdown = False

if not self._insecure:
self._credentials = _get_credentials(
credentials,
self._credentials,
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_KEY,
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)

self._initialize_channel_and_stub()
Comment thread
dheeraj-vanamala marked this conversation as resolved.

def _initialize_channel_and_stub(self):
"""
Create a new gRPC channel and stub.

This method is used during initialization and by the reconnection
mechanism to reinitialize the channel on transient errors.
"""
if self._insecure:
self._channel = insecure_channel(
self._endpoint,
compression=self._compression,
options=self._channel_options,
)
else:
self._channel = secure_channel(
self._endpoint,
self._credentials,
compression=compression,
compression=self._compression,
options=self._channel_options,
)
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]

self._shutdown_in_progress = threading.Event()
self._shutdown = False

@abstractmethod
def _translate_data(
self,
Expand Down Expand Up @@ -407,6 +437,25 @@ def _export(
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

# For UNAVAILABLE errors, reinitialize the channel to force reconnection
if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore
logger.debug(
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
self._exporting,
)
try:
self._channel.close()
except Exception as e:
logger.debug(
"Error closing channel for %s exporter to %s: %s",
self._exporting,
self._endpoint,
str(e),
)
# Enable channel reconnection for subsequent calls
self._initialize_channel_and_stub()

if (
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
or retry_num + 1 == _MAX_RETRYS
Expand Down Expand Up @@ -436,6 +485,12 @@ def _export(
return self._result.FAILURE # type: ignore [reportReturnType]

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""
Shut down the exporter.

Args:
timeout_millis: Timeout in milliseconds for shutting down the exporter.
"""
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from unittest import TestCase
from unittest.mock import Mock, patch

import grpc
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
Expand Down Expand Up @@ -89,8 +90,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
def _exporting(self):
return "traces"

def shutdown(self, timeout_millis=30_000):
return OTLPExporterMixin.shutdown(self, timeout_millis)
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs)


class TraceServiceServicerWithExportParams(TraceServiceServicer):
Expand Down Expand Up @@ -511,6 +512,16 @@ def test_timeout_set_correctly(self):
self.assertEqual(mock_trace_service.num_requests, 2)
self.assertAlmostEqual(after - before, 1.4, 1)

def test_channel_options_set_correctly(self):
"""Test that gRPC channel options are set correctly for keepalive and reconnection"""
# This test verifies that the channel is created with the right options
# We patch grpc.insecure_channel to ensure it is called without errors
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel"
) as mock_channel:
OTLPSpanExporterForTesting(insecure=True)
self.assertTrue(mock_channel.called)

def test_otlp_headers_from_env(self):
# pylint: disable=protected-access
# This ensures that there is no other header than standard user-agent.
Expand All @@ -534,3 +545,27 @@ def test_permanent_failure(self):
warning.records[-1].message,
"Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS",
)

def test_unavailable_reconnects(self):
"""Test that the exporter reconnects on UNAVAILABLE error"""
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE),
self.server,
)

# Spy on grpc.insecure_channel to verify it's called for reconnection
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel",
side_effect=grpc.insecure_channel,
) as mock_insecure_channel:
# Mock sleep to avoid waiting
with patch("time.sleep"):
# We expect FAILURE because the server keeps returning UNAVAILABLE
# but we want to verify reconnection attempts happened
self.exporter.export([self.span])

# Verify that we attempted to reinitialize the channel (called insecure_channel)
# Since the initial channel was created in setUp (unpatched), this call
# must be from the reconnection logic.
self.assertTrue(mock_insecure_channel.called)
# Verify that reconnection enabled flag is set
Loading