diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs index 7b080a85dbd1..5e26b4ab9fb4 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs @@ -49,6 +49,25 @@ internal static int GetRetryInterval(Response httpResponse) return MinimumRetryInterval; } + internal static bool TryGetRetryIntervalTimespan(Response? httpResponse, out TimeSpan retryAfter) + { + if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue)) + { + if (int.TryParse(retryAfterValue, out var delaySeconds)) + { + retryAfter = TimeSpan.FromSeconds(delaySeconds); + return true; + } + if (DateTimeOffset.TryParse(retryAfterValue, out DateTimeOffset delayTime)) + { + retryAfter = (delayTime - DateTimeOffset.Now); + return true; + } + } + + return false; + } + internal static byte[]? GetRequestContent(RequestContent? content) { if (content == null) diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs new file mode 100644 index 000000000000..3e4e078ce3c3 --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionState.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Monitor.OpenTelemetry.Exporter.Internals +{ + internal enum TransmissionState + { + /// + /// Represents disabled transmission. + /// + Open, + + /// + /// Represents enabled transmission. + /// + Closed + } +} diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs new file mode 100644 index 000000000000..8d6483cabd0b --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmissionStateManager.cs @@ -0,0 +1,165 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; + +namespace Azure.Monitor.OpenTelemetry.Exporter.Internals +{ + internal class TransmissionStateManager : IDisposable + { + private const int MaxDelayInSeconds = 3600; + + private const int MinDelayInSeconds = 10; + + private readonly Random _random = new(); + + /// + /// Minimum time interval between failures to increment consecutive error count. + /// + private TimeSpan _minIntervalToUpdateConsecutiveErrors = TimeSpan.FromSeconds(20); + + /// + /// Time threshold after which consecutive error count can be incremented. + /// + private DateTimeOffset _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.MinValue; + + private readonly System.Timers.Timer _backOffIntervalTimer; + + private double _syncBackOffIntervalCalculation; + + private int _consecutiveErrors; + private bool _disposed; + + internal TransmissionState State { get; private set; } + + internal TransmissionStateManager() + { + _backOffIntervalTimer = new(); + _backOffIntervalTimer.Elapsed += ResetTransmission; + _backOffIntervalTimer.AutoReset = false; + State = TransmissionState.Closed; + } + + /// + /// For test purposes. + /// + /// + /// + /// + /// + /// + internal TransmissionStateManager( + Random random, + TimeSpan minIntervalToUpdateConsecutiveErrors, + DateTimeOffset nextMinTimeToUpdateConsecutiveErrors, + System.Timers.Timer backOffIntervalTimer, + TransmissionState state) + { + _random = random; + _minIntervalToUpdateConsecutiveErrors = minIntervalToUpdateConsecutiveErrors; + _nextMinTimeToUpdateConsecutiveErrors = nextMinTimeToUpdateConsecutiveErrors; + _backOffIntervalTimer = backOffIntervalTimer; + State = state; + } + + /// + /// Stops transmitting data to backend. + /// + private void OpenTransmission() + { + State = TransmissionState.Open; + } + + /// + /// Enable transmitting data to backend. + /// To be called for each successful request or after back-off interval expiration. + /// + internal void CloseTransmission() + { + State = TransmissionState.Closed; + } + + /// + /// Resets consecutive error count. + /// To be called for each successful request. + /// + internal void ResetConsecutiveErrors() + { + Interlocked.Exchange(ref _consecutiveErrors, 0); + } + + internal void ResetTransmission(object source, System.Timers.ElapsedEventArgs e) + { + CloseTransmission(); + } + + internal void EnableBackOff(Response? response) + { + if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0) + { + // Do not increase number of errors more often than minimum interval. + // since we can have 4 parallel transmissions (logs, metrics, traces and offline storage tranmission) and all of them most likely would fail if we have intermittent error. + if (DateTimeOffset.UtcNow > _nextMinTimeToUpdateConsecutiveErrors) + { + Interlocked.Increment(ref _consecutiveErrors); + _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.UtcNow + _minIntervalToUpdateConsecutiveErrors; + + // If backend responded with a retryAfter header we will use it + // else we will calculate by increasing time interval exponentially. + var backOffTimeInterval = HttpPipelineHelper.TryGetRetryIntervalTimespan(response, out var retryAfterInterval) ? retryAfterInterval : GetBackOffTimeInterval(); + + if (backOffTimeInterval > TimeSpan.Zero) + { + OpenTransmission(); + + _backOffIntervalTimer.Interval = backOffTimeInterval.TotalMilliseconds; + + _backOffIntervalTimer.Start(); + } + } + + Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 0); + } + } + + /// + /// Calculates the time interval for which the transmission should be halted. + /// Number of consecutive errors are taken in to account to increase the time. + /// Random variation is introduced in order to avoid collision. + /// + /// BackOff time interval. + internal TimeSpan GetBackOffTimeInterval() + { + double delayInSeconds = 0; + if (_consecutiveErrors > 0) + { + double backOffSlot = (Math.Pow(2, _consecutiveErrors) - 1) / 2; + var backOffDelay = _random.Next(1, (int)Math.Min(backOffSlot * MinDelayInSeconds, int.MaxValue)); + delayInSeconds = Math.Max(Math.Min(backOffDelay, MaxDelayInSeconds), MinDelayInSeconds); + } + + return TimeSpan.FromSeconds(delayInSeconds); + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _backOffIntervalTimer?.Dispose(); + } + + _disposed = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs new file mode 100644 index 000000000000..b71f44f48258 --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/TransmissionStateManagerTests.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using Azure.Core.TestFramework; +using Azure.Monitor.OpenTelemetry.Exporter.Internals; +using Xunit; + +namespace Azure.Monitor.OpenTelemetry.Exporter.Tests +{ + public class TransmissionStateManagerTests + { + [Fact] + public void EnableBackOffSetsStateToOpen() + { + var transmissionStateManager = new TransmissionStateManager(); + MockResponse mockResponse = new MockResponse(500, "Internal Server Error"); + transmissionStateManager.EnableBackOff(mockResponse); + + Assert.Equal(TransmissionState.Open, transmissionStateManager.State); + } + + [Fact] + public void EnableBackOffSetsStateToOpenUsingRetryAfterHeaderInResponse() + { + System.Timers.Timer backOffIntervalTimer = new(); + var transmissionStateManager = new TransmissionStateManager( + random: new(), + minIntervalToUpdateConsecutiveErrors: TimeSpan.FromSeconds(20), + nextMinTimeToUpdateConsecutiveErrors: DateTimeOffset.MinValue, + backOffIntervalTimer: backOffIntervalTimer, + TransmissionState.Closed + ); + + MockResponse mockResponse = new MockResponse(429, "Internal Server Error"); + mockResponse.AddHeader("Retry-After", "20"); + transmissionStateManager.EnableBackOff(mockResponse); + + Assert.Equal(20000, backOffIntervalTimer.Interval); + Assert.Equal(TransmissionState.Open, transmissionStateManager.State); + } + } +}