From 3e81cdb95741c9d6e914eb133a0b44eff5fe6647 Mon Sep 17 00:00:00 2001 From: Jackson Weber Date: Mon, 2 Feb 2026 16:40:35 -0800 Subject: [PATCH 1/5] Fix ingestion sampling 206 issue. --- .../opentelemetry/exporter/export/_base.py | 37 +++++++- .../tests/test_base_exporter.py | 95 +++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index e4a8c999c033..43486f7688b4 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -287,7 +287,16 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: reach_ingestion = True resend_envelopes = [] for error in track_response.errors: - if _is_retryable_code(error.status_code): + # Check for sampling rejection - these should not be retried + # because the server will always reject them based on sampling rules + if _is_sampling_rejection(error.message): + if not self._is_stats_exporter(): + logger.info( + "Data dropped due to ingestion sampling: %s %s.", + error.message, + envelopes[error.index] if error.index is not None else "", + ) + elif _is_retryable_code(error.status_code): resend_envelopes.append(envelopes[error.index]) # type: ignore # Track retried items in customer sdkstats if self._should_collect_customer_sdkstats(): @@ -568,6 +577,32 @@ def _reached_ingestion_code(response_code: Optional[int]) -> bool: return response_code in _REACHED_INGESTION_STATUS_CODES +def _is_sampling_rejection(message: Optional[str]) -> bool: + """Determine if error message indicates ingestion-side sampling rejection. + + When the server rejects telemetry due to ingestion sampling, the error message + will contain sampling-related keywords. These items should not be retried or persisted + because the server will always reject them based on sampling rules. + + Checks for common sampling-related rejection messages from Breeze: + - "sampled out" + - "sampling" + - "filtered by sampling" + + :param str message: Error message from the server + :return: True if the error indicates a sampling rejection + :rtype: bool + """ + if message is None: + return False + lower_message = message.lower() + return ( + "sampled out" in lower_message + or "sampling" in lower_message + or "filtered by sampling" in lower_message + ) + + _MONITOR_DOMAIN_MAPPING = { "EventData": TelemetryEventData, "ExceptionData": TelemetryExceptionData, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py index f1ca99984f1b..07b9f0550547 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py @@ -15,6 +15,7 @@ _format_storage_telemetry_item, _get_auth_policy, _get_authentication_credential, + _is_sampling_rejection, BaseExporter, ExportResult, _get_storage_directory, @@ -731,6 +732,100 @@ def test_transmission_206_no_retry(self): self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE) exporter.storage.put.assert_not_called() + def test_transmission_206_sampling_rejection_not_retried(self): + """Test that items rejected due to ingestion sampling are not retried or persisted.""" + exporter = BaseExporter(disable_offline_storage=True) + exporter.storage = mock.Mock() + custom_envelopes_to_export = [ + TelemetryItem(name="Test1", time=datetime.now()), + TelemetryItem(name="Test2", time=datetime.now()), + TelemetryItem(name="Test3", time=datetime.now()), + ] + with mock.patch.object(AzureMonitorClient, "track") as post: + post.return_value = TrackResponse( + items_received=3, + items_accepted=0, + errors=[ + # This item should NOT be retried due to sampling rejection (sampled out) + TelemetryErrorDetails( + index=0, + status_code=500, + message="Sampled out by ingestion sampling", + ), + # This item should NOT be retried due to sampling rejection (filtered by sampling) + TelemetryErrorDetails( + index=1, + status_code=500, + message="Filtered by sampling policy", + ), + # This item should be retried (normal timeout error) + TelemetryErrorDetails( + index=2, + status_code=408, + message="Timeout error", + ), + ], + ) + result = exporter._transmit(custom_envelopes_to_export) + # Should still attempt storage for the non-sampling rejection error + self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE) + # Storage should be called only for the non-sampling error (index=2) + exporter.storage.put.assert_called_once() + # Verify only one envelope was stored (the one with non-sampling error) + stored_envelopes = exporter.storage.put.call_args[0][0] + self.assertEqual(len(stored_envelopes), 1) + + def test_transmission_206_all_sampling_rejections_no_storage(self): + """Test that when all items are rejected due to sampling, nothing is persisted.""" + exporter = BaseExporter(disable_offline_storage=True) + exporter.storage = mock.Mock() + custom_envelopes_to_export = [ + TelemetryItem(name="Test1", time=datetime.now()), + TelemetryItem(name="Test2", time=datetime.now()), + ] + with mock.patch.object(AzureMonitorClient, "track") as post: + post.return_value = TrackResponse( + items_received=2, + items_accepted=0, + errors=[ + TelemetryErrorDetails( + index=0, + status_code=500, + message="Sampled out by ingestion sampling", + ), + TelemetryErrorDetails( + index=1, + status_code=500, + message="Filtered by sampling policy", + ), + ], + ) + result = exporter._transmit(custom_envelopes_to_export) + self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE) + # Storage should NOT be called since all errors are sampling rejections + exporter.storage.put.assert_not_called() + + def test_is_sampling_rejection_true(self): + """Test that _is_sampling_rejection correctly identifies sampling rejection messages.""" + # Test "sampled out" pattern + self.assertTrue(_is_sampling_rejection("Sampled out by ingestion sampling")) + self.assertTrue(_is_sampling_rejection("SAMPLED OUT")) + # Test "sampling" pattern + self.assertTrue(_is_sampling_rejection("Filtered by sampling policy")) + self.assertTrue(_is_sampling_rejection("Item rejected due to sampling")) + self.assertTrue(_is_sampling_rejection("SAMPLING")) + # Test "filtered by sampling" pattern + self.assertTrue(_is_sampling_rejection("filtered by sampling")) + self.assertTrue(_is_sampling_rejection("FILTERED BY SAMPLING")) + + def test_is_sampling_rejection_false(self): + """Test that _is_sampling_rejection returns False for non-sampling messages.""" + self.assertFalse(_is_sampling_rejection("Internal server error")) + self.assertFalse(_is_sampling_rejection("Bad request")) + self.assertFalse(_is_sampling_rejection("Timeout error")) + self.assertFalse(_is_sampling_rejection("")) + self.assertFalse(_is_sampling_rejection(None)) + def test_transmission_400(self): with mock.patch("requests.Session.request") as post: post.return_value = MockResponse(400, "{}") From d297b077cb50b798456104687ed8a1673d079f03 Mon Sep 17 00:00:00 2001 From: Jackson Weber Date: Tue, 3 Feb 2026 09:43:31 -0800 Subject: [PATCH 2/5] Update CHANGELOG.md --- .../azure-monitor-opentelemetry-exporter/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index d5d33e275d38..44f860a1a97b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 1.0.0b48 (Unreleased) + +### Other Changes +- Fix Ingestion-Side Sampling Disk Persist Behavior + ([#37214]https://github.com/Azure/azure-sdk-for-js/pull/37214) + ## 1.0.0b47 (2026-02-03) ### Features Added From fc5b8b35df27b9aaec029cde0298948a66b69fe8 Mon Sep 17 00:00:00 2001 From: Jackson Weber Date: Tue, 3 Feb 2026 12:55:04 -0800 Subject: [PATCH 3/5] Make telemetry sampled message more specific and update tests. --- .../opentelemetry/exporter/export/_base.py | 14 ++-------- .../tests/test_base_exporter.py | 27 ++++++++----------- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 43486f7688b4..6a7d77cfb8de 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -581,26 +581,16 @@ def _is_sampling_rejection(message: Optional[str]) -> bool: """Determine if error message indicates ingestion-side sampling rejection. When the server rejects telemetry due to ingestion sampling, the error message - will contain sampling-related keywords. These items should not be retried or persisted + will be "Telemetry sampled out." These items should not be retried or persisted because the server will always reject them based on sampling rules. - Checks for common sampling-related rejection messages from Breeze: - - "sampled out" - - "sampling" - - "filtered by sampling" - :param str message: Error message from the server :return: True if the error indicates a sampling rejection :rtype: bool """ if message is None: return False - lower_message = message.lower() - return ( - "sampled out" in lower_message - or "sampling" in lower_message - or "filtered by sampling" in lower_message - ) + return message == "Telemetry sampled out." _MONITOR_DOMAIN_MAPPING = { diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py index 07b9f0550547..1d84bd901665 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py @@ -746,17 +746,17 @@ def test_transmission_206_sampling_rejection_not_retried(self): items_received=3, items_accepted=0, errors=[ - # This item should NOT be retried due to sampling rejection (sampled out) + # This item should NOT be retried due to sampling rejection TelemetryErrorDetails( index=0, status_code=500, - message="Sampled out by ingestion sampling", + message="Telemetry sampled out.", ), - # This item should NOT be retried due to sampling rejection (filtered by sampling) + # This item should NOT be retried due to sampling rejection TelemetryErrorDetails( index=1, status_code=500, - message="Filtered by sampling policy", + message="Telemetry sampled out.", ), # This item should be retried (normal timeout error) TelemetryErrorDetails( @@ -791,12 +791,12 @@ def test_transmission_206_all_sampling_rejections_no_storage(self): TelemetryErrorDetails( index=0, status_code=500, - message="Sampled out by ingestion sampling", + message="Telemetry sampled out.", ), TelemetryErrorDetails( index=1, status_code=500, - message="Filtered by sampling policy", + message="Telemetry sampled out.", ), ], ) @@ -807,16 +807,7 @@ def test_transmission_206_all_sampling_rejections_no_storage(self): def test_is_sampling_rejection_true(self): """Test that _is_sampling_rejection correctly identifies sampling rejection messages.""" - # Test "sampled out" pattern - self.assertTrue(_is_sampling_rejection("Sampled out by ingestion sampling")) - self.assertTrue(_is_sampling_rejection("SAMPLED OUT")) - # Test "sampling" pattern - self.assertTrue(_is_sampling_rejection("Filtered by sampling policy")) - self.assertTrue(_is_sampling_rejection("Item rejected due to sampling")) - self.assertTrue(_is_sampling_rejection("SAMPLING")) - # Test "filtered by sampling" pattern - self.assertTrue(_is_sampling_rejection("filtered by sampling")) - self.assertTrue(_is_sampling_rejection("FILTERED BY SAMPLING")) + self.assertTrue(_is_sampling_rejection("Telemetry sampled out.")) def test_is_sampling_rejection_false(self): """Test that _is_sampling_rejection returns False for non-sampling messages.""" @@ -825,6 +816,10 @@ def test_is_sampling_rejection_false(self): self.assertFalse(_is_sampling_rejection("Timeout error")) self.assertFalse(_is_sampling_rejection("")) self.assertFalse(_is_sampling_rejection(None)) + # Similar messages that don't match exactly should return False + self.assertFalse(_is_sampling_rejection("Telemetry sampled out")) # Missing period + self.assertFalse(_is_sampling_rejection("telemetry sampled out.")) # Lowercase + self.assertFalse(_is_sampling_rejection("Sampled out")) def test_transmission_400(self): with mock.patch("requests.Session.request") as post: From 4e276201a81e54eb142b8eaa853fded2485fe406 Mon Sep 17 00:00:00 2001 From: Jackson Weber Date: Tue, 3 Feb 2026 12:55:41 -0800 Subject: [PATCH 4/5] Update CHANGELOG.md --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 44f860a1a97b..d3bea349fd9c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -4,7 +4,7 @@ ### Other Changes - Fix Ingestion-Side Sampling Disk Persist Behavior - ([#37214]https://github.com/Azure/azure-sdk-for-js/pull/37214) + ([#44980]https://github.com/Azure/azure-sdk-for-python/pull/44980) ## 1.0.0b47 (2026-02-03) From 0922eef45d664d75e0232f8767416c231d1312c5 Mon Sep 17 00:00:00 2001 From: Jackson Weber Date: Tue, 3 Feb 2026 14:00:05 -0800 Subject: [PATCH 5/5] Updated message values. --- sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md | 2 +- .../azure/monitor/opentelemetry/exporter/export/_base.py | 2 +- .../tests/test_base_exporter.py | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index d3bea349fd9c..12480ba55830 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -4,7 +4,7 @@ ### Other Changes - Fix Ingestion-Side Sampling Disk Persist Behavior - ([#44980]https://github.com/Azure/azure-sdk-for-python/pull/44980) + ([#44980](https://github.com/Azure/azure-sdk-for-python/pull/44980)) ## 1.0.0b47 (2026-02-03) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 6a7d77cfb8de..ebff4fef4812 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -590,7 +590,7 @@ def _is_sampling_rejection(message: Optional[str]) -> bool: """ if message is None: return False - return message == "Telemetry sampled out." + return message.lower() == "telemetry sampled out." _MONITOR_DOMAIN_MAPPING = { diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py index 1d84bd901665..097726261cfc 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py @@ -808,6 +808,9 @@ def test_transmission_206_all_sampling_rejections_no_storage(self): def test_is_sampling_rejection_true(self): """Test that _is_sampling_rejection correctly identifies sampling rejection messages.""" self.assertTrue(_is_sampling_rejection("Telemetry sampled out.")) + self.assertTrue(_is_sampling_rejection("telemetry sampled out.")) + self.assertTrue(_is_sampling_rejection("TELEMETRY SAMPLED OUT.")) + self.assertTrue(_is_sampling_rejection("Telemetry Sampled Out.")) def test_is_sampling_rejection_false(self): """Test that _is_sampling_rejection returns False for non-sampling messages.""" @@ -818,7 +821,6 @@ def test_is_sampling_rejection_false(self): self.assertFalse(_is_sampling_rejection(None)) # Similar messages that don't match exactly should return False self.assertFalse(_is_sampling_rejection("Telemetry sampled out")) # Missing period - self.assertFalse(_is_sampling_rejection("telemetry sampled out.")) # Lowercase self.assertFalse(_is_sampling_rejection("Sampled out")) def test_transmission_400(self):