Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ internal class AzureMonitorTransmitter : ITransmitter
internal PersistentBlobProvider? _fileBlobProvider;
private readonly AzureMonitorStatsbeat? _statsbeat;
private readonly ConnectionVars _connectionVars;
internal readonly TransmissionStateManager _transmissionStateManager;
internal readonly TransmitFromStorageHandler? _transmitFromStorageHandler;
private bool _disposed;

public AzureMonitorTransmitter(AzureMonitorExporterOptions options)
Expand All @@ -41,8 +43,15 @@ public AzureMonitorTransmitter(AzureMonitorExporterOptions options)

_applicationInsightsRestClient = InitializeRestClient(options, _connectionVars);

_transmissionStateManager = new TransmissionStateManager();

_fileBlobProvider = InitializeOfflineStorage(options);

if (_fileBlobProvider != null)
{
_transmitFromStorageHandler = new TransmitFromStorageHandler(_applicationInsightsRestClient, _fileBlobProvider, _transmissionStateManager);
}

_statsbeat = InitializeStatsbeat(options, _connectionVars);
}

Expand Down Expand Up @@ -165,10 +174,13 @@ await _applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancella

if (result == ExportResult.Failure && _fileBlobProvider != null)
{
_transmissionStateManager.EnableBackOff(httpMessage.Response);
result = HttpPipelineHelper.HandleFailures(httpMessage, _fileBlobProvider);
}
else
{
_transmissionStateManager.ResetConsecutiveErrors();
_transmissionStateManager.CloseTransmission();
AzureMonitorExporterEventSource.Log.WriteInformational("TransmissionSuccess", "Successfully transmitted a batch of telemetry Items.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ internal void CloseTransmission()
/// </summary>
internal void ResetConsecutiveErrors()
{
_nextMinTimeToUpdateConsecutiveErrors = DateTimeOffset.MinValue;
Interlocked.Exchange(ref _consecutiveErrors, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Azure.Monitor.OpenTelemetry.Exporter.Internals
internal class TransmitFromStorageHandler : IDisposable
{
private readonly ApplicationInsightsRestClient _applicationInsightsRestClient;
private readonly PersistentBlobProvider _blobProvider;
internal PersistentBlobProvider _blobProvider;
private readonly TransmissionStateManager _transmissionStateManager;
private readonly System.Timers.Timer _transmitFromStorageTimer;
private bool _disposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ public void TelemetryIsTransmittedSuccessfullyFromStorage()
Assert.NotNull(transmitter._fileBlobProvider);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, new TransmissionStateManager());
transmitFromStorageHandler.TransmitFromStorage(null, null);
Assert.Equal(TransmissionState.Open, transmitter._transmissionStateManager.State);

// Reset transmission state
transmitter._transmissionStateManager.ResetConsecutiveErrors();
transmitter._transmissionStateManager.CloseTransmission();

transmitter._transmitFromStorageHandler?.TransmitFromStorage(null, null);

// Assert
// Blob will be deleted on successful transmission
Expand Down Expand Up @@ -184,18 +189,9 @@ public void TelemetryIsNotTransmittedWhenTransmissionStateIsOpen()
Assert.NotNull(transmitter._fileBlobProvider);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

var transmissionStateManager = new TransmissionStateManager(
random: new(),
minIntervalToUpdateConsecutiveErrors: TimeSpan.FromSeconds(20),
nextMinTimeToUpdateConsecutiveErrors: DateTimeOffset.MinValue,
backOffIntervalTimer: new System.Timers.Timer(),
TransmissionState.Open
);

Assert.Equal(TransmissionState.Open, transmissionStateManager.State);
Assert.Equal(TransmissionState.Open, transmitter._transmissionStateManager.State);

var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, transmissionStateManager);
transmitFromStorageHandler.TransmitFromStorage(null, null);
transmitter._transmitFromStorageHandler?.TransmitFromStorage(null, null);

// Assert
// Blob will not be deleted as the transmission state is open.
Expand All @@ -221,16 +217,17 @@ public void TransmissionStateIsSetToOpenOnFailedRequest()
Assert.NotNull(transmitter._fileBlobProvider);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

var transmissionStateManager = new TransmissionStateManager();
var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, transmissionStateManager);
Assert.Equal(TransmissionState.Open, transmitter._transmissionStateManager.State);

Assert.Equal(TransmissionState.Closed, transmissionStateManager.State);
// Reset transmission state
transmitter._transmissionStateManager.ResetConsecutiveErrors();
transmitter._transmissionStateManager.CloseTransmission();

transmitFromStorageHandler.TransmitFromStorage(null, null);
transmitter._transmitFromStorageHandler?.TransmitFromStorage(null, null);

// Assert
// Blob will not be deleted as the transmission state is open.
Assert.Equal(TransmissionState.Open, transmissionStateManager.State);
Assert.Equal(TransmissionState.Open, transmitter._transmissionStateManager.State);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

transmitter.Dispose();
Expand All @@ -250,6 +247,10 @@ private static AzureMonitorTransmitter GetTransmitter(params MockResponse[] mock

// Overwrite storage with mock
transmitter._fileBlobProvider = new MockFileProvider();
if (transmitter._transmitFromStorageHandler != null)
{
transmitter._transmitFromStorageHandler._blobProvider = transmitter._fileBlobProvider;
}

return transmitter;
}
Expand Down