From 806760de0fb7d4fb86a37d886303e6c1e25e52df Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Tue, 28 Feb 2023 11:15:19 -0800 Subject: [PATCH 01/11] draft --- .../src/Internals/AzureMonitorTransmitter.cs | 8 +- .../src/Internals/HttpPipelineHelper.cs | 8 +- .../src/Internals/TransmissionState.cs | 15 +++ .../src/Internals/TransmissionStateManager.cs | 107 ++++++++++++++++++ 4 files changed, 130 insertions(+), 8 deletions(-) create mode 100644 sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs create mode 100644 sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs index ab60483da59d..2d1010b7c23e 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs @@ -275,7 +275,7 @@ private ExportResult HandleFailures(HttpMessage httpMessage) content = HttpPipelineHelper.GetPartialContentForRetry(trackResponse, httpMessage.Request.Content); if (content != null) { - retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); result = _fileBlobProvider.SaveTelemetry(content, retryInterval); } break; @@ -285,7 +285,7 @@ private ExportResult HandleFailures(HttpMessage httpMessage) // Parse retry-after header // Send Messages To Storage content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content); - retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); result = _fileBlobProvider.SaveTelemetry(content, retryInterval); break; case ResponseStatusCodes.Unauthorized: @@ -341,7 +341,7 @@ private void HandleFailures(HttpMessage httpMessage, PersistentBlob blob) var content = HttpPipelineHelper.GetPartialContentForRetry(trackResponse, httpMessage.Request.Content); if (content != null) { - retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); blob.TryDelete(); _fileBlobProvider?.SaveTelemetry(content, retryInterval); } @@ -351,7 +351,7 @@ private void HandleFailures(HttpMessage httpMessage, PersistentBlob blob) case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache: // Extend lease time using retry interval period // so that it is not picked up again before that. - retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); blob.TryLease(retryInterval); break; case ResponseStatusCodes.Unauthorized: diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs index 4de61ed16788..a551d941b067 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs @@ -34,21 +34,21 @@ internal static TrackResponse GetTrackResponse(HttpMessage message) } } - internal static int GetRetryInterval(Response httpResponse) + internal static TimeSpan GetRetryInterval(Response httpResponse) { if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue)) { if (int.TryParse(retryAfterValue, out var delaySeconds)) { - return (int)TimeSpan.FromSeconds(delaySeconds).TotalMilliseconds; + return TimeSpan.FromSeconds(delaySeconds); } if (DateTimeOffset.TryParse(retryAfterValue, out DateTimeOffset delayTime)) { - return (int)(delayTime - DateTimeOffset.Now).TotalMilliseconds; + return (delayTime - DateTimeOffset.Now); } } - return MinimumRetryInterval; + return TimeSpan.MinValue; } internal static byte[] GetRequestContent(RequestContent content) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs new file mode 100644 index 000000000000..bbbbb901282b --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Monitor.OpenTelemetry.Exporter.Internals +{ + internal enum TransmissionState + { + Open, + Closed + } +} diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs new file mode 100644 index 000000000000..9c797bdcffe1 --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -0,0 +1,107 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; + +namespace Azure.Monitor.OpenTelemetry.Exporter.Internals +{ + internal class TransmissionStateManager + { + private int _consecutiveErrors; + + private const int MaxDelayInMilliseconds = 3600000; + + internal const int MinDelayInMilliseconds = 10000; + + private readonly Random s_random = new(); + + private readonly TimeSpan _minIntervalToUpdateConsecutiveErrors = TimeSpan.FromMilliseconds(20000); + + private DateTimeOffset _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.MinValue; + + private readonly System.Timers.Timer _timer; + + private double _syncBackOffIntervalCalculation; + + private double _syncconsecutiveErrorIncrement; + + private static TransmissionStateManager? s_transmissionStateManager; + + internal static TransmissionStateManager Instance => s_transmissionStateManager ??= new TransmissionStateManager(); + + internal TransmissionState State { get; private set; } + + private TransmissionStateManager() + { + _timer = new(); + _timer.Elapsed += RestartTransmission; + _timer.AutoReset = false; + State = TransmissionState.Open; + } + + /// + /// Prevents transmitting data to backend. + /// + private void CloseTransmission() + { + State = TransmissionState.Closed; + } + + /// + /// Re-enable transmitting telemetry to backend. + /// + /// + /// + internal void RestartTransmission(Object source, System.Timers.ElapsedEventArgs e) + { + _consecutiveErrors = 0; + // reset _sync so that the threads can close the transmission again if needed. + _syncBackOffIntervalCalculation = 0; + State = TransmissionState.Open; + } + + internal void SetBackOffTimeAndShutOffTransmission(Response? response) + { + if (Interlocked.Exchange(ref _syncconsecutiveErrorIncrement, 1) == 0) + { + // Do not increase number of errors more often than minimum interval (MinDelayInSeconds). + // since we have at most 4 senders (3 transmitters and one offline storage thread) and all of them most likely would fail if we have intermittent error. + if (DateTimeOffset.UtcNow > _nextMinTimeToUpdateConsecutiveErrors) + { + _consecutiveErrors++; + _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.UtcNow + _minIntervalToUpdateConsecutiveErrors; + } + + Interlocked.Exchange(ref _syncconsecutiveErrorIncrement, 0); + } + + if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0) + { + // if backend responded with a retryAfter header we will use it + // else we will calculate by increasing time interval exponentially. + var retryAfterInterval = HttpPipelineHelper.GetRetryInterval(response); + var backOffTimeInterval = retryAfterInterval != TimeSpan.MinValue ? retryAfterInterval : GetBackOffTimeInterval(); + + CloseTransmission(); + + _timer.Interval = backOffTimeInterval.TotalMilliseconds; + + _timer.Start(); + } + } + + internal TimeSpan GetBackOffTimeInterval() + { + double delayInMilliseconds = MinDelayInMilliseconds; + if (_consecutiveErrors > 1) + { + double backOffSlot = (Math.Pow(2, _consecutiveErrors) - 1) / 2; + var backOffDelay = s_random.Next(1, (int)Math.Min(backOffSlot * MinDelayInMilliseconds, int.MaxValue)); + delayInMilliseconds = Math.Max(Math.Min(backOffDelay, MaxDelayInMilliseconds), MinDelayInMilliseconds); + } + + return TimeSpan.FromMilliseconds(delayInMilliseconds); + } + } +} From 1ce397cb16baee59732322b9b423858a935a9c5b Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Tue, 14 Mar 2023 14:44:25 -0700 Subject: [PATCH 02/11] Transmission state mgr for exponential backoff --- .../src/Internals/AzureMonitorTransmitter.cs | 8 +-- .../src/Internals/HttpPipelineHelper.cs | 19 ++++++- .../src/Internals/TransmissionStateManager.cs | 57 +++++++------------ 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs index 01e5d62aa132..78ca0dd52090 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs @@ -277,7 +277,7 @@ private ExportResult HandleFailures(HttpMessage httpMessage) content = HttpPipelineHelper.GetPartialContentForRetry(trackResponse, httpMessage.Request.Content); if (content != null) { - retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); result = _fileBlobProvider.SaveTelemetry(content, retryInterval); } break; @@ -287,7 +287,7 @@ private ExportResult HandleFailures(HttpMessage httpMessage) // Parse retry-after header // Send Messages To Storage content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content); - retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); result = _fileBlobProvider.SaveTelemetry(content, retryInterval); break; case ResponseStatusCodes.Unauthorized: @@ -343,7 +343,7 @@ private void HandleFailures(HttpMessage httpMessage, PersistentBlob blob) var content = HttpPipelineHelper.GetPartialContentForRetry(trackResponse, httpMessage.Request.Content); if (content != null) { - retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); blob.TryDelete(); _fileBlobProvider?.SaveTelemetry(content, retryInterval); } @@ -353,7 +353,7 @@ private void HandleFailures(HttpMessage httpMessage, PersistentBlob blob) case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache: // Extend lease time using retry interval period // so that it is not picked up again before that. - retryInterval = HttpPipelineHelper.GetRetryIntervalInMilliseconds(httpMessage.Response); + retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response); blob.TryLease(retryInterval); break; case ResponseStatusCodes.Unauthorized: diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs index a551d941b067..260f5c458004 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs @@ -34,7 +34,24 @@ internal static TrackResponse GetTrackResponse(HttpMessage message) } } - internal static TimeSpan GetRetryInterval(Response httpResponse) + internal static int GetRetryInterval(Response httpResponse) + { + if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue)) + { + if (int.TryParse(retryAfterValue, out var delaySeconds)) + { + return (int)TimeSpan.FromSeconds(delaySeconds).TotalMilliseconds; + } + if (DateTimeOffset.TryParse(retryAfterValue, out DateTimeOffset delayTime)) + { + return (int)(delayTime - DateTimeOffset.Now).TotalMilliseconds; + } + } + + return MinimumRetryInterval; + } + + internal static TimeSpan GetRetryIntervalTimespan(Response httpResponse) { if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue)) { diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 9c797bdcffe1..2810046b1136 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -10,11 +10,11 @@ internal class TransmissionStateManager { private int _consecutiveErrors; - private const int MaxDelayInMilliseconds = 3600000; + private const long MaxDelayInMilliseconds = 3600000; - internal const int MinDelayInMilliseconds = 10000; + internal const long MinDelayInMilliseconds = 10000; - private readonly Random s_random = new(); + private readonly Random _random = new(); private readonly TimeSpan _minIntervalToUpdateConsecutiveErrors = TimeSpan.FromMilliseconds(20000); @@ -24,12 +24,6 @@ internal class TransmissionStateManager private double _syncBackOffIntervalCalculation; - private double _syncconsecutiveErrorIncrement; - - private static TransmissionStateManager? s_transmissionStateManager; - - internal static TransmissionStateManager Instance => s_transmissionStateManager ??= new TransmissionStateManager(); - internal TransmissionState State { get; private set; } private TransmissionStateManager() @@ -37,57 +31,50 @@ private TransmissionStateManager() _timer = new(); _timer.Elapsed += RestartTransmission; _timer.AutoReset = false; - State = TransmissionState.Open; + State = TransmissionState.Closed; } /// /// Prevents transmitting data to backend. /// - private void CloseTransmission() + private void OpenTransmission() { - State = TransmissionState.Closed; + State = TransmissionState.Open; } - /// - /// Re-enable transmitting telemetry to backend. - /// - /// - /// internal void RestartTransmission(Object source, System.Timers.ElapsedEventArgs e) { _consecutiveErrors = 0; + // reset _sync so that the threads can close the transmission again if needed. _syncBackOffIntervalCalculation = 0; - State = TransmissionState.Open; + State = TransmissionState.Closed; } - internal void SetBackOffTimeAndShutOffTransmission(Response? response) + internal void EnableBackOff(Response? response) { - if (Interlocked.Exchange(ref _syncconsecutiveErrorIncrement, 1) == 0) + if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0) { - // Do not increase number of errors more often than minimum interval (MinDelayInSeconds). - // since we have at most 4 senders (3 transmitters and one offline storage thread) and all of them most likely would fail if we have intermittent error. + // Do not increase number of errors more often than minimum interval (MinDelayInMilliseconds). + // since we have can have 4 parallel transmissions (logs, metrics, traces and offline storage tranmission) and all of them most likely would fail if we have intermittent error. if (DateTimeOffset.UtcNow > _nextMinTimeToUpdateConsecutiveErrors) { _consecutiveErrors++; _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.UtcNow + _minIntervalToUpdateConsecutiveErrors; - } - Interlocked.Exchange(ref _syncconsecutiveErrorIncrement, 0); - } + // If backend responded with a retryAfter header we will use it + // else we will calculate by increasing time interval exponentially. + var retryAfterInterval = HttpPipelineHelper.GetRetryIntervalTimespan(response); + var backOffTimeInterval = retryAfterInterval != TimeSpan.MinValue ? retryAfterInterval : GetBackOffTimeInterval(); - if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0) - { - // if backend responded with a retryAfter header we will use it - // else we will calculate by increasing time interval exponentially. - var retryAfterInterval = HttpPipelineHelper.GetRetryInterval(response); - var backOffTimeInterval = retryAfterInterval != TimeSpan.MinValue ? retryAfterInterval : GetBackOffTimeInterval(); + OpenTransmission(); - CloseTransmission(); + _timer.Interval = backOffTimeInterval.TotalMilliseconds; - _timer.Interval = backOffTimeInterval.TotalMilliseconds; + _timer.Start(); + } - _timer.Start(); + Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 0); } } @@ -97,7 +84,7 @@ internal TimeSpan GetBackOffTimeInterval() if (_consecutiveErrors > 1) { double backOffSlot = (Math.Pow(2, _consecutiveErrors) - 1) / 2; - var backOffDelay = s_random.Next(1, (int)Math.Min(backOffSlot * MinDelayInMilliseconds, int.MaxValue)); + var backOffDelay = _random.Next(1, (int)Math.Min(backOffSlot * MinDelayInMilliseconds, int.MaxValue)); delayInMilliseconds = Math.Max(Math.Min(backOffDelay, MaxDelayInMilliseconds), MinDelayInMilliseconds); } From 14a7b40cccc6cb7b23272892375e9033e270df7a Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Tue, 14 Mar 2023 15:03:32 -0700 Subject: [PATCH 03/11] fix comment --- .../src/Internals/TransmissionStateManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 2810046b1136..4127178b8099 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -56,7 +56,7 @@ internal void EnableBackOff(Response? response) if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0) { // Do not increase number of errors more often than minimum interval (MinDelayInMilliseconds). - // since we have can have 4 parallel transmissions (logs, metrics, traces and offline storage tranmission) and all of them most likely would fail if we have intermittent error. + // since we can have 4 parallel transmissions (logs, metrics, traces and offline storage tranmission) and all of them most likely would fail if we have intermittent error. if (DateTimeOffset.UtcNow > _nextMinTimeToUpdateConsecutiveErrors) { _consecutiveErrors++; From 7763072953eeedfd1fa2315830cfac26d9ff2d9c Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Tue, 14 Mar 2023 15:09:14 -0700 Subject: [PATCH 04/11] fix comment --- .../src/Internals/TransmissionStateManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 4127178b8099..09910adf2f4c 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -46,7 +46,7 @@ internal void RestartTransmission(Object source, System.Timers.ElapsedEventArgs { _consecutiveErrors = 0; - // reset _sync so that the threads can close the transmission again if needed. + // Reset _sync so that the threads can open the transmission again if needed. _syncBackOffIntervalCalculation = 0; State = TransmissionState.Closed; } From 31ba30402192d509b2bc5ae25851ed21b67097b6 Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Tue, 14 Mar 2023 15:10:23 -0700 Subject: [PATCH 05/11] refactor --- .../src/Internals/TransmissionStateManager.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 09910adf2f4c..5ec074f5168c 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -20,7 +20,7 @@ internal class TransmissionStateManager private DateTimeOffset _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.MinValue; - private readonly System.Timers.Timer _timer; + private readonly System.Timers.Timer _backOffIntervalTimer; private double _syncBackOffIntervalCalculation; @@ -28,9 +28,9 @@ internal class TransmissionStateManager private TransmissionStateManager() { - _timer = new(); - _timer.Elapsed += RestartTransmission; - _timer.AutoReset = false; + _backOffIntervalTimer = new(); + _backOffIntervalTimer.Elapsed += RestartTransmission; + _backOffIntervalTimer.AutoReset = false; State = TransmissionState.Closed; } @@ -69,9 +69,9 @@ internal void EnableBackOff(Response? response) OpenTransmission(); - _timer.Interval = backOffTimeInterval.TotalMilliseconds; + _backOffIntervalTimer.Interval = backOffTimeInterval.TotalMilliseconds; - _timer.Start(); + _backOffIntervalTimer.Start(); } Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 0); From b708e297a4b7a3fe0755639637fb1dcd4a8b512a Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Wed, 15 Mar 2023 11:08:52 -0700 Subject: [PATCH 06/11] address PR comments --- .../src/Internals/HttpPipelineHelper.cs | 10 +-- .../src/Internals/TransmissionState.cs | 11 +-- .../src/Internals/TransmissionStateManager.cs | 72 ++++++++++++------- 3 files changed, 60 insertions(+), 33 deletions(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs index 260f5c458004..ef35263d83ec 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs @@ -51,21 +51,23 @@ internal static int GetRetryInterval(Response httpResponse) return MinimumRetryInterval; } - internal static TimeSpan GetRetryIntervalTimespan(Response httpResponse) + internal static bool TryGetRetryIntervalTimespan(Response httpResponse, out TimeSpan retryAfter) { if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue)) { if (int.TryParse(retryAfterValue, out var delaySeconds)) { - return TimeSpan.FromSeconds(delaySeconds); + retryAfter = TimeSpan.FromSeconds(delaySeconds); + return true; } if (DateTimeOffset.TryParse(retryAfterValue, out DateTimeOffset delayTime)) { - return (delayTime - DateTimeOffset.Now); + retryAfter = (delayTime - DateTimeOffset.Now); + return true; } } - return TimeSpan.MinValue; + return false; } internal static byte[] GetRequestContent(RequestContent content) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs index bbbbb901282b..3e4e078ce3c3 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs @@ -1,15 +1,18 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; -using System.Collections.Generic; -using System.Text; - namespace Azure.Monitor.OpenTelemetry.Exporter.Internals { internal enum TransmissionState { + /// + /// Represents disabled transmission. + /// Open, + + /// + /// Represents enabled transmission. + /// Closed } } diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 5ec074f5168c..a1a23cda0186 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -8,70 +8,92 @@ namespace Azure.Monitor.OpenTelemetry.Exporter.Internals { internal class TransmissionStateManager { - private int _consecutiveErrors; - - private const long MaxDelayInMilliseconds = 3600000; + private const int MaxDelayInSeconds = 3600; - internal const long MinDelayInMilliseconds = 10000; + private const int MinDelayInSeconds = 10; private readonly Random _random = new(); - private readonly TimeSpan _minIntervalToUpdateConsecutiveErrors = TimeSpan.FromMilliseconds(20000); + /// + /// Minimum time interval between failures to increment consecutive error count. + /// + private TimeSpan _minIntervalToUpdateConsecutiveErrors = TimeSpan.FromSeconds(20); + /// + /// Time threshold after which consecutive error count can be incremented. + /// private DateTimeOffset _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.MinValue; private readonly System.Timers.Timer _backOffIntervalTimer; private double _syncBackOffIntervalCalculation; + private int _consecutiveErrors; + internal TransmissionState State { get; private set; } - private TransmissionStateManager() + internal TransmissionStateManager() { _backOffIntervalTimer = new(); - _backOffIntervalTimer.Elapsed += RestartTransmission; + _backOffIntervalTimer.Elapsed += ResetTransmission; _backOffIntervalTimer.AutoReset = false; State = TransmissionState.Closed; } /// - /// Prevents transmitting data to backend. + /// Stops transmitting data to backend. /// private void OpenTransmission() { State = TransmissionState.Open; } - internal void RestartTransmission(Object source, System.Timers.ElapsedEventArgs e) + /// + /// Enable transmitting data to backend. + /// To be called for each successful request or after back-off interval expiration. + /// + internal void CloseTransmision() { - _consecutiveErrors = 0; - - // Reset _sync so that the threads can open the transmission again if needed. - _syncBackOffIntervalCalculation = 0; State = TransmissionState.Closed; } + /// + /// Resets consecutive error count. + /// To be called for each successful request. + /// + internal void ResetConsecutiveErrors() + { + Interlocked.Exchange(ref _consecutiveErrors, 0); + } + + internal void ResetTransmission(object source, System.Timers.ElapsedEventArgs e) + { + CloseTransmision(); + } + internal void EnableBackOff(Response? response) { if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0) { - // Do not increase number of errors more often than minimum interval (MinDelayInMilliseconds). + // Do not increase number of errors more often than minimum interval. // since we can have 4 parallel transmissions (logs, metrics, traces and offline storage tranmission) and all of them most likely would fail if we have intermittent error. if (DateTimeOffset.UtcNow > _nextMinTimeToUpdateConsecutiveErrors) { - _consecutiveErrors++; + Interlocked.Increment(ref _consecutiveErrors); _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.UtcNow + _minIntervalToUpdateConsecutiveErrors; // If backend responded with a retryAfter header we will use it // else we will calculate by increasing time interval exponentially. - var retryAfterInterval = HttpPipelineHelper.GetRetryIntervalTimespan(response); - var backOffTimeInterval = retryAfterInterval != TimeSpan.MinValue ? retryAfterInterval : GetBackOffTimeInterval(); + var backOffTimeInterval = HttpPipelineHelper.TryGetRetryIntervalTimespan(response, out var retryAfterInterval) ? retryAfterInterval : GetBackOffTimeInterval(); - OpenTransmission(); + if (backOffTimeInterval > TimeSpan.Zero) + { + OpenTransmission(); - _backOffIntervalTimer.Interval = backOffTimeInterval.TotalMilliseconds; + _backOffIntervalTimer.Interval = backOffTimeInterval.TotalMilliseconds; - _backOffIntervalTimer.Start(); + _backOffIntervalTimer.Start(); + } } Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 0); @@ -80,15 +102,15 @@ internal void EnableBackOff(Response? response) internal TimeSpan GetBackOffTimeInterval() { - double delayInMilliseconds = MinDelayInMilliseconds; - if (_consecutiveErrors > 1) + double delayInSeconds = 0; + if (_consecutiveErrors > 0) { double backOffSlot = (Math.Pow(2, _consecutiveErrors) - 1) / 2; - var backOffDelay = _random.Next(1, (int)Math.Min(backOffSlot * MinDelayInMilliseconds, int.MaxValue)); - delayInMilliseconds = Math.Max(Math.Min(backOffDelay, MaxDelayInMilliseconds), MinDelayInMilliseconds); + var backOffDelay = _random.Next(1, (int)Math.Min(backOffSlot * MinDelayInSeconds, int.MaxValue)); + delayInSeconds = Math.Max(Math.Min(backOffDelay, MaxDelayInSeconds), MinDelayInSeconds); } - return TimeSpan.FromMilliseconds(delayInMilliseconds); + return TimeSpan.FromSeconds(delayInSeconds); } } } From d929740145d9b8a2b4692e4e25d549d9b8789a09 Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Wed, 15 Mar 2023 11:28:37 -0700 Subject: [PATCH 07/11] fix typo --- .../src/Internals/TransmissionStateManager.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index a1a23cda0186..a2fd701db3f1 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -52,7 +52,7 @@ private void OpenTransmission() /// Enable transmitting data to backend. /// To be called for each successful request or after back-off interval expiration. /// - internal void CloseTransmision() + internal void CloseTransmission() { State = TransmissionState.Closed; } @@ -68,7 +68,7 @@ internal void ResetConsecutiveErrors() internal void ResetTransmission(object source, System.Timers.ElapsedEventArgs e) { - CloseTransmision(); + CloseTransmission(); } internal void EnableBackOff(Response? response) From bf3055c2be20d9b9dbc27ccfe6f0f0f00d15a1cf Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Wed, 15 Mar 2023 11:39:26 -0700 Subject: [PATCH 08/11] add summary for back off time interval calculation --- .../src/Internals/TransmissionStateManager.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index a2fd701db3f1..e90a8785e86c 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -100,6 +100,12 @@ internal void EnableBackOff(Response? response) } } + /// + /// Calculates the time interval for which the transmission should be halted. + /// Number of consecutive errors are taken in to account to increase the time. + /// Random variation is introduced in order to avoid collision. + /// + /// BackOff time interval internal TimeSpan GetBackOffTimeInterval() { double delayInSeconds = 0; From 9a3878f83cc7636c79b1903db91138c77fe43723 Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Wed, 15 Mar 2023 11:49:41 -0700 Subject: [PATCH 09/11] fix comment --- .../src/Internals/TransmissionStateManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index e90a8785e86c..15c6d22928ec 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -105,7 +105,7 @@ internal void EnableBackOff(Response? response) /// Number of consecutive errors are taken in to account to increase the time. /// Random variation is introduced in order to avoid collision. /// - /// BackOff time interval + /// BackOff time interval. internal TimeSpan GetBackOffTimeInterval() { double delayInSeconds = 0; From 70a8bd1f0c6499c6d2780885a14ff244db1374e1 Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Wed, 15 Mar 2023 13:32:53 -0700 Subject: [PATCH 10/11] add ctr for tests and unit tests --- .../src/Internals/TransmissionStateManager.cs | 22 ++++++++++ .../TransmissionStateManagerTests.cs | 43 +++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 15c6d22928ec..6009e579641c 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -40,6 +40,28 @@ internal TransmissionStateManager() State = TransmissionState.Closed; } + /// + /// For test purposes. + /// + /// + /// + /// + /// + /// + internal TransmissionStateManager( + Random random, + TimeSpan minIntervalToUpdateConsecutiveErrors, + DateTimeOffset nextMinTimeToUpdateConsecutiveErrors, + System.Timers.Timer backOffIntervalTimer, + TransmissionState state) + { + _random = random; + _minIntervalToUpdateConsecutiveErrors = minIntervalToUpdateConsecutiveErrors; + _nextMinTimeToUpdateConsecutiveErrors = nextMinTimeToUpdateConsecutiveErrors; + _backOffIntervalTimer = backOffIntervalTimer; + State = state; + } + /// /// Stops transmitting data to backend. /// diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs new file mode 100644 index 000000000000..b71f44f48258 --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using Azure.Core.TestFramework; +using Azure.Monitor.OpenTelemetry.Exporter.Internals; +using Xunit; + +namespace Azure.Monitor.OpenTelemetry.Exporter.Tests +{ + public class TransmissionStateManagerTests + { + [Fact] + public void EnableBackOffSetsStateToOpen() + { + var transmissionStateManager = new TransmissionStateManager(); + MockResponse mockResponse = new MockResponse(500, "Internal Server Error"); + transmissionStateManager.EnableBackOff(mockResponse); + + Assert.Equal(TransmissionState.Open, transmissionStateManager.State); + } + + [Fact] + public void EnableBackOffSetsStateToOpenUsingRetryAfterHeaderInResponse() + { + System.Timers.Timer backOffIntervalTimer = new(); + var transmissionStateManager = new TransmissionStateManager( + random: new(), + minIntervalToUpdateConsecutiveErrors: TimeSpan.FromSeconds(20), + nextMinTimeToUpdateConsecutiveErrors: DateTimeOffset.MinValue, + backOffIntervalTimer: backOffIntervalTimer, + TransmissionState.Closed + ); + + MockResponse mockResponse = new MockResponse(429, "Internal Server Error"); + mockResponse.AddHeader("Retry-After", "20"); + transmissionStateManager.EnableBackOff(mockResponse); + + Assert.Equal(20000, backOffIntervalTimer.Interval); + Assert.Equal(TransmissionState.Open, transmissionStateManager.State); + } + } +} From 5e5072e765e5f62dab514cf44a9c9568a2064d96 Mon Sep 17 00:00:00 2001 From: Vishwesh Bankwar Date: Wed, 15 Mar 2023 15:26:48 -0700 Subject: [PATCH 11/11] add timer dispose --- .../src/Internals/HttpPipelineHelper.cs | 2 +- .../src/Internals/TransmissionStateManager.cs | 23 ++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs index 9a6b56194cad..5e26b4ab9fb4 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs @@ -49,7 +49,7 @@ internal static int GetRetryInterval(Response httpResponse) return MinimumRetryInterval; } - internal static bool TryGetRetryIntervalTimespan(Response httpResponse, out TimeSpan retryAfter) + internal static bool TryGetRetryIntervalTimespan(Response? httpResponse, out TimeSpan retryAfter) { if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue)) { diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs index 6009e579641c..8d6483cabd0b 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -6,7 +6,7 @@ namespace Azure.Monitor.OpenTelemetry.Exporter.Internals { - internal class TransmissionStateManager + internal class TransmissionStateManager : IDisposable { private const int MaxDelayInSeconds = 3600; @@ -29,6 +29,7 @@ internal class TransmissionStateManager private double _syncBackOffIntervalCalculation; private int _consecutiveErrors; + private bool _disposed; internal TransmissionState State { get; private set; } @@ -140,5 +141,25 @@ internal TimeSpan GetBackOffTimeInterval() return TimeSpan.FromSeconds(delayInSeconds); } + + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _backOffIntervalTimer?.Dispose(); + } + + _disposed = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } } }