Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
### Bugs Fixed

### Other Changes
- Fix Ingestion-Side Sampling Disk Persist Behavior
([#44980](https://github.com/Azure/azure-sdk-for-python/pull/44980))

## 1.0.0b47 (2026-02-03)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,16 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
reach_ingestion = True
resend_envelopes = []
for error in track_response.errors:
if _is_retryable_code(error.status_code):
# Check for sampling rejection - these should not be retried
# because the server will always reject them based on sampling rules
if _is_sampling_rejection(error.message):
if not self._is_stats_exporter():
logger.info(
"Data dropped due to ingestion sampling: %s %s.",
error.message,
envelopes[error.index] if error.index is not None else "",
)
elif _is_retryable_code(error.status_code):
resend_envelopes.append(envelopes[error.index]) # type: ignore
# Track retried items in customer sdkstats
if self._should_collect_customer_sdkstats():
Expand Down Expand Up @@ -568,6 +577,22 @@ def _reached_ingestion_code(response_code: Optional[int]) -> bool:
return response_code in _REACHED_INGESTION_STATUS_CODES


def _is_sampling_rejection(message: Optional[str]) -> bool:
"""Determine if error message indicates ingestion-side sampling rejection.

When the server rejects telemetry due to ingestion sampling, the error message
will be "Telemetry sampled out." These items should not be retried or persisted
because the server will always reject them based on sampling rules.

:param str message: Error message from the server
:return: True if the error indicates a sampling rejection
:rtype: bool
"""
if message is None:
return False
return message.lower() == "telemetry sampled out."


_MONITOR_DOMAIN_MAPPING = {
"EventData": TelemetryEventData,
"ExceptionData": TelemetryExceptionData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_format_storage_telemetry_item,
_get_auth_policy,
_get_authentication_credential,
_is_sampling_rejection,
BaseExporter,
ExportResult,
_get_storage_directory,
Expand Down Expand Up @@ -731,6 +732,97 @@ def test_transmission_206_no_retry(self):
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
exporter.storage.put.assert_not_called()

def test_transmission_206_sampling_rejection_not_retried(self):
"""Test that items rejected due to ingestion sampling are not retried or persisted."""
exporter = BaseExporter(disable_offline_storage=True)
exporter.storage = mock.Mock()
custom_envelopes_to_export = [
TelemetryItem(name="Test1", time=datetime.now()),
TelemetryItem(name="Test2", time=datetime.now()),
TelemetryItem(name="Test3", time=datetime.now()),
]
with mock.patch.object(AzureMonitorClient, "track") as post:
post.return_value = TrackResponse(
items_received=3,
items_accepted=0,
errors=[
# This item should NOT be retried due to sampling rejection
TelemetryErrorDetails(
index=0,
status_code=500,
message="Telemetry sampled out.",
),
# This item should NOT be retried due to sampling rejection
TelemetryErrorDetails(
index=1,
status_code=500,
message="Telemetry sampled out.",
),
# This item should be retried (normal timeout error)
TelemetryErrorDetails(
index=2,
status_code=408,
message="Timeout error",
),
],
)
result = exporter._transmit(custom_envelopes_to_export)
# Should still attempt storage for the non-sampling rejection error
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
# Storage should be called only for the non-sampling error (index=2)
exporter.storage.put.assert_called_once()
# Verify only one envelope was stored (the one with non-sampling error)
stored_envelopes = exporter.storage.put.call_args[0][0]
self.assertEqual(len(stored_envelopes), 1)

def test_transmission_206_all_sampling_rejections_no_storage(self):
"""Test that when all items are rejected due to sampling, nothing is persisted."""
exporter = BaseExporter(disable_offline_storage=True)
exporter.storage = mock.Mock()
custom_envelopes_to_export = [
TelemetryItem(name="Test1", time=datetime.now()),
TelemetryItem(name="Test2", time=datetime.now()),
]
with mock.patch.object(AzureMonitorClient, "track") as post:
post.return_value = TrackResponse(
items_received=2,
items_accepted=0,
errors=[
TelemetryErrorDetails(
index=0,
status_code=500,
message="Telemetry sampled out.",
),
TelemetryErrorDetails(
index=1,
status_code=500,
message="Telemetry sampled out.",
),
],
)
result = exporter._transmit(custom_envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
# Storage should NOT be called since all errors are sampling rejections
exporter.storage.put.assert_not_called()

def test_is_sampling_rejection_true(self):
"""Test that _is_sampling_rejection correctly identifies sampling rejection messages."""
self.assertTrue(_is_sampling_rejection("Telemetry sampled out."))
self.assertTrue(_is_sampling_rejection("telemetry sampled out."))
self.assertTrue(_is_sampling_rejection("TELEMETRY SAMPLED OUT."))
self.assertTrue(_is_sampling_rejection("Telemetry Sampled Out."))

def test_is_sampling_rejection_false(self):
"""Test that _is_sampling_rejection returns False for non-sampling messages."""
self.assertFalse(_is_sampling_rejection("Internal server error"))
self.assertFalse(_is_sampling_rejection("Bad request"))
self.assertFalse(_is_sampling_rejection("Timeout error"))
self.assertFalse(_is_sampling_rejection(""))
self.assertFalse(_is_sampling_rejection(None))
# Similar messages that don't match exactly should return False
self.assertFalse(_is_sampling_rejection("Telemetry sampled out")) # Missing period
self.assertFalse(_is_sampling_rejection("Sampled out"))

def test_transmission_400(self):
with mock.patch("requests.Session.request") as post:
post.return_value = MockResponse(400, "{}")
Expand Down