diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 478a2e1a41f7..bd9f2cbd852b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -9,6 +9,8 @@ ### Bugs Fixed ### Other Changes +- Fix Ingestion-Side Sampling Disk Persist Behavior + ([#44980](https://github.com/Azure/azure-sdk-for-python/pull/44980)) ## 1.0.0b47 (2026-02-03) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index e4a8c999c033..ebff4fef4812 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -287,7 +287,16 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: reach_ingestion = True resend_envelopes = [] for error in track_response.errors: - if _is_retryable_code(error.status_code): + # Check for sampling rejection - these should not be retried + # because the server will always reject them based on sampling rules + if _is_sampling_rejection(error.message): + if not self._is_stats_exporter(): + logger.info( + "Data dropped due to ingestion sampling: %s %s.", + error.message, + envelopes[error.index] if error.index is not None else "", + ) + elif _is_retryable_code(error.status_code): resend_envelopes.append(envelopes[error.index]) # type: ignore # Track retried items in customer sdkstats if self._should_collect_customer_sdkstats(): @@ -568,6 +577,22 @@ def _reached_ingestion_code(response_code: Optional[int]) -> bool: return response_code in _REACHED_INGESTION_STATUS_CODES +def _is_sampling_rejection(message: Optional[str]) -> bool: + """Determine if error message indicates ingestion-side sampling rejection. + + When the server rejects telemetry due to ingestion sampling, the error message + will be "Telemetry sampled out." These items should not be retried or persisted + because the server will always reject them based on sampling rules. + + :param str message: Error message from the server + :return: True if the error indicates a sampling rejection + :rtype: bool + """ + if message is None: + return False + return message.lower() == "telemetry sampled out." + + _MONITOR_DOMAIN_MAPPING = { "EventData": TelemetryEventData, "ExceptionData": TelemetryExceptionData, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py index f1ca99984f1b..097726261cfc 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py @@ -15,6 +15,7 @@ _format_storage_telemetry_item, _get_auth_policy, _get_authentication_credential, + _is_sampling_rejection, BaseExporter, ExportResult, _get_storage_directory, @@ -731,6 +732,97 @@ def test_transmission_206_no_retry(self): self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE) exporter.storage.put.assert_not_called() + def test_transmission_206_sampling_rejection_not_retried(self): + """Test that items rejected due to ingestion sampling are not retried or persisted.""" + exporter = BaseExporter(disable_offline_storage=True) + exporter.storage = mock.Mock() + custom_envelopes_to_export = [ + TelemetryItem(name="Test1", time=datetime.now()), + TelemetryItem(name="Test2", time=datetime.now()), + TelemetryItem(name="Test3", time=datetime.now()), + ] + with mock.patch.object(AzureMonitorClient, "track") as post: + post.return_value = TrackResponse( + items_received=3, + items_accepted=0, + errors=[ + # This item should NOT be retried due to sampling rejection + TelemetryErrorDetails( + index=0, + status_code=500, + message="Telemetry sampled out.", + ), + # This item should NOT be retried due to sampling rejection + TelemetryErrorDetails( + index=1, + status_code=500, + message="Telemetry sampled out.", + ), + # This item should be retried (normal timeout error) + TelemetryErrorDetails( + index=2, + status_code=408, + message="Timeout error", + ), + ], + ) + result = exporter._transmit(custom_envelopes_to_export) + # Should still attempt storage for the non-sampling rejection error + self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE) + # Storage should be called only for the non-sampling error (index=2) + exporter.storage.put.assert_called_once() + # Verify only one envelope was stored (the one with non-sampling error) + stored_envelopes = exporter.storage.put.call_args[0][0] + self.assertEqual(len(stored_envelopes), 1) + + def test_transmission_206_all_sampling_rejections_no_storage(self): + """Test that when all items are rejected due to sampling, nothing is persisted.""" + exporter = BaseExporter(disable_offline_storage=True) + exporter.storage = mock.Mock() + custom_envelopes_to_export = [ + TelemetryItem(name="Test1", time=datetime.now()), + TelemetryItem(name="Test2", time=datetime.now()), + ] + with mock.patch.object(AzureMonitorClient, "track") as post: + post.return_value = TrackResponse( + items_received=2, + items_accepted=0, + errors=[ + TelemetryErrorDetails( + index=0, + status_code=500, + message="Telemetry sampled out.", + ), + TelemetryErrorDetails( + index=1, + status_code=500, + message="Telemetry sampled out.", + ), + ], + ) + result = exporter._transmit(custom_envelopes_to_export) + self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE) + # Storage should NOT be called since all errors are sampling rejections + exporter.storage.put.assert_not_called() + + def test_is_sampling_rejection_true(self): + """Test that _is_sampling_rejection correctly identifies sampling rejection messages.""" + self.assertTrue(_is_sampling_rejection("Telemetry sampled out.")) + self.assertTrue(_is_sampling_rejection("telemetry sampled out.")) + self.assertTrue(_is_sampling_rejection("TELEMETRY SAMPLED OUT.")) + self.assertTrue(_is_sampling_rejection("Telemetry Sampled Out.")) + + def test_is_sampling_rejection_false(self): + """Test that _is_sampling_rejection returns False for non-sampling messages.""" + self.assertFalse(_is_sampling_rejection("Internal server error")) + self.assertFalse(_is_sampling_rejection("Bad request")) + self.assertFalse(_is_sampling_rejection("Timeout error")) + self.assertFalse(_is_sampling_rejection("")) + self.assertFalse(_is_sampling_rejection(None)) + # Similar messages that don't match exactly should return False + self.assertFalse(_is_sampling_rejection("Telemetry sampled out")) # Missing period + self.assertFalse(_is_sampling_rejection("Sampled out")) + def test_transmission_400(self): with mock.patch("requests.Session.request") as post: post.return_value = MockResponse(400, "{}")