Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ internal static int GetRetryInterval(Response httpResponse)
return MinimumRetryInterval;
}

internal static bool TryGetRetryIntervalTimespan(Response? httpResponse, out TimeSpan retryAfter)
{
if (httpResponse != null && httpResponse.Headers.TryGetValue(RetryAfterHeaderName, out var retryAfterValue))
{
if (int.TryParse(retryAfterValue, out var delaySeconds))
Comment thread
rajkumar-rangaraj marked this conversation as resolved.
{
retryAfter = TimeSpan.FromSeconds(delaySeconds);
return true;
}
if (DateTimeOffset.TryParse(retryAfterValue, out DateTimeOffset delayTime))
{
retryAfter = (delayTime - DateTimeOffset.Now);
return true;
}
}

return false;
}

internal static byte[]? GetRequestContent(RequestContent? content)
{
if (content == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Monitor.OpenTelemetry.Exporter.Internals
{
internal enum TransmissionState
Comment thread
vishweshbankwar marked this conversation as resolved.
{
/// <summary>
/// Represents disabled transmission.
/// </summary>
Open,

/// <summary>
/// Represents enabled transmission.
/// </summary>
Closed
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;

namespace Azure.Monitor.OpenTelemetry.Exporter.Internals
{
internal class TransmissionStateManager : IDisposable
{
private const int MaxDelayInSeconds = 3600;

private const int MinDelayInSeconds = 10;

private readonly Random _random = new();
Comment thread
vishweshbankwar marked this conversation as resolved.

/// <summary>
/// Minimum time interval between failures to increment consecutive error count.
/// </summary>
private TimeSpan _minIntervalToUpdateConsecutiveErrors = TimeSpan.FromSeconds(20);

/// <summary>
/// Time threshold after which consecutive error count can be incremented.
/// </summary>
private DateTimeOffset _nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.MinValue;
Comment thread
vishweshbankwar marked this conversation as resolved.

private readonly System.Timers.Timer _backOffIntervalTimer;
Comment thread
vishweshbankwar marked this conversation as resolved.

private double _syncBackOffIntervalCalculation;

private int _consecutiveErrors;
private bool _disposed;

internal TransmissionState State { get; private set; }

internal TransmissionStateManager()
{
_backOffIntervalTimer = new();
_backOffIntervalTimer.Elapsed += ResetTransmission;
_backOffIntervalTimer.AutoReset = false;
State = TransmissionState.Closed;
}

/// <summary>
/// For test purposes.
/// </summary>
/// <param name="random"></param>
/// <param name="minIntervalToUpdateConsecutiveErrors"></param>
/// <param name="nextMinTimeToUpdateConsecutiveErrors"></param>
/// <param name="backOffIntervalTimer"></param>
/// <param name="state"></param>
internal TransmissionStateManager(
Random random,
TimeSpan minIntervalToUpdateConsecutiveErrors,
DateTimeOffset nextMinTimeToUpdateConsecutiveErrors,
System.Timers.Timer backOffIntervalTimer,
TransmissionState state)
{
_random = random;
_minIntervalToUpdateConsecutiveErrors = minIntervalToUpdateConsecutiveErrors;
_nextMinTimeToUpdateConsecutiveErrors = nextMinTimeToUpdateConsecutiveErrors;
_backOffIntervalTimer = backOffIntervalTimer;
State = state;
}

/// <summary>
/// Stops transmitting data to backend.
/// </summary>
private void OpenTransmission()
Comment thread
vishweshbankwar marked this conversation as resolved.
{
State = TransmissionState.Open;
}

/// <summary>
/// Enable transmitting data to backend.
/// To be called for each successful request or after back-off interval expiration.
/// </summary>
internal void CloseTransmission()
{
State = TransmissionState.Closed;
}

/// <summary>
/// Resets consecutive error count.
/// To be called for each successful request.
/// </summary>
internal void ResetConsecutiveErrors()
{
Interlocked.Exchange(ref _consecutiveErrors, 0);
}

internal void ResetTransmission(object source, System.Timers.ElapsedEventArgs e)
{
CloseTransmission();
}

internal void EnableBackOff(Response? response)
{
if (Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 1) == 0)
{
// Do not increase number of errors more often than minimum interval.
// since we can have 4 parallel transmissions (logs, metrics, traces and offline storage tranmission) and all of them most likely would fail if we have intermittent error.
if (DateTimeOffset.UtcNow > _nextMinTimeToUpdateConsecutiveErrors)
{
Interlocked.Increment(ref _consecutiveErrors);
_nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.UtcNow + _minIntervalToUpdateConsecutiveErrors;

// If backend responded with a retryAfter header we will use it
// else we will calculate by increasing time interval exponentially.
var backOffTimeInterval = HttpPipelineHelper.TryGetRetryIntervalTimespan(response, out var retryAfterInterval) ? retryAfterInterval : GetBackOffTimeInterval();

if (backOffTimeInterval > TimeSpan.Zero)
{
OpenTransmission();

_backOffIntervalTimer.Interval = backOffTimeInterval.TotalMilliseconds;

_backOffIntervalTimer.Start();
}
}

Interlocked.Exchange(ref _syncBackOffIntervalCalculation, 0);
}
}

/// <summary>
/// Calculates the time interval for which the transmission should be halted.
/// Number of consecutive errors are taken in to account to increase the time.
/// Random variation is introduced in order to avoid collision.
/// </summary>
/// <returns>BackOff time interval.</returns>
internal TimeSpan GetBackOffTimeInterval()
{
double delayInSeconds = 0;
if (_consecutiveErrors > 0)
{
double backOffSlot = (Math.Pow(2, _consecutiveErrors) - 1) / 2;
var backOffDelay = _random.Next(1, (int)Math.Min(backOffSlot * MinDelayInSeconds, int.MaxValue));
delayInSeconds = Math.Max(Math.Min(backOffDelay, MaxDelayInSeconds), MinDelayInSeconds);
}

return TimeSpan.FromSeconds(delayInSeconds);
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_backOffIntervalTimer?.Dispose();
}

_disposed = true;
}
}

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using Azure.Core.TestFramework;
using Azure.Monitor.OpenTelemetry.Exporter.Internals;
using Xunit;

namespace Azure.Monitor.OpenTelemetry.Exporter.Tests
{
public class TransmissionStateManagerTests
{
[Fact]
public void EnableBackOffSetsStateToOpen()
{
var transmissionStateManager = new TransmissionStateManager();
MockResponse mockResponse = new MockResponse(500, "Internal Server Error");
transmissionStateManager.EnableBackOff(mockResponse);

Assert.Equal(TransmissionState.Open, transmissionStateManager.State);
}

[Fact]
public void EnableBackOffSetsStateToOpenUsingRetryAfterHeaderInResponse()
{
System.Timers.Timer backOffIntervalTimer = new();
var transmissionStateManager = new TransmissionStateManager(
random: new(),
minIntervalToUpdateConsecutiveErrors: TimeSpan.FromSeconds(20),
nextMinTimeToUpdateConsecutiveErrors: DateTimeOffset.MinValue,
backOffIntervalTimer: backOffIntervalTimer,
TransmissionState.Closed
);

MockResponse mockResponse = new MockResponse(429, "Internal Server Error");
mockResponse.AddHeader("Retry-After", "20");
transmissionStateManager.EnableBackOff(mockResponse);

Assert.Equal(20000, backOffIntervalTimer.Interval);
Assert.Equal(TransmissionState.Open, transmissionStateManager.State);
}
}
}