diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs index c70437145665..ac374882ca76 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/AzureMonitorTransmitter.cs @@ -24,7 +24,7 @@ namespace Azure.Monitor.OpenTelemetry.Exporter.Internals /// internal class AzureMonitorTransmitter : ITransmitter { - private readonly ApplicationInsightsRestClient _applicationInsightsRestClient; + internal readonly ApplicationInsightsRestClient _applicationInsightsRestClient; internal PersistentBlobProvider? _fileBlobProvider; private readonly AzureMonitorStatsbeat? _statsbeat; private readonly ConnectionVars _connectionVars; diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs index 5e26b4ab9fb4..c012390c5415 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/HttpPipelineHelper.cs @@ -9,6 +9,9 @@ using Azure.Core; using Azure.Monitor.OpenTelemetry.Exporter.Models; +using OpenTelemetry; +using OpenTelemetry.Extensions.PersistentStorage.Abstractions; +using Azure.Monitor.OpenTelemetry.Exporter.Internals.PersistentStorage; namespace Azure.Monitor.OpenTelemetry.Exporter.Internals { @@ -126,5 +129,63 @@ internal static bool TryGetRetryIntervalTimespan(Response? httpResponse, out Tim return Encoding.UTF8.GetBytes(partialContent); } + + internal static ExportResult IsSuccess(HttpMessage httpMessage) + { + if (httpMessage.HasResponse && httpMessage.Response.Status == ResponseStatusCodes.Success) + { + return ExportResult.Success; + } + + return ExportResult.Failure; + } + + internal static void HandleFailures(HttpMessage httpMessage, PersistentBlob blob, PersistentBlobProvider blobProvider) + { + int statusCode = 0; + bool shouldRetry = true; + + if (httpMessage.HasResponse) + { + statusCode = httpMessage.Response.Status; + switch (statusCode) + { + case ResponseStatusCodes.PartialSuccess: + // Parse retry-after header + // Send Failed Messages To Storage + // Delete existing file + TrackResponse trackResponse = GetTrackResponse(httpMessage); + var content = GetPartialContentForRetry(trackResponse, httpMessage.Request.Content); + if (content != null) + { + blob.TryDelete(); + blobProvider.SaveTelemetry(content); + } + break; + case ResponseStatusCodes.RequestTimeout: + case ResponseStatusCodes.ResponseCodeTooManyRequests: + case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache: + case ResponseStatusCodes.Unauthorized: + case ResponseStatusCodes.Forbidden: + case ResponseStatusCodes.InternalServerError: + case ResponseStatusCodes.BadGateway: + case ResponseStatusCodes.ServiceUnavailable: + case ResponseStatusCodes.GatewayTimeout: + break; + default: + shouldRetry = false; + break; + } + } + + if (shouldRetry) + { + AzureMonitorExporterEventSource.Log.WriteWarning("FailedToTransmitFromStorage", $"Error code is {statusCode}: Telemetry is stored offline for retry"); + } + else + { + AzureMonitorExporterEventSource.Log.WriteWarning("FailedToTransmitFromStorage", $"Error code is {statusCode}: Telemetry is dropped"); + } + } } } diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/PersistentStorage/PersistentStorageExtensions.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/PersistentStorage/PersistentStorageExtensions.cs index cdf72c343dd2..6e2091cfb13d 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/PersistentStorage/PersistentStorageExtensions.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/PersistentStorage/PersistentStorageExtensions.cs @@ -12,5 +12,10 @@ internal static ExportResult SaveTelemetry(this PersistentBlobProvider storage, { return storage.TryCreateBlob(content, leaseTime, out _) ? ExportResult.Success : ExportResult.Failure; } + + internal static ExportResult SaveTelemetry(this PersistentBlobProvider storage, byte[] content) + { + return storage.TryCreateBlob(content, out _) ? ExportResult.Success : ExportResult.Failure; + } } } diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmitFromStorageHandler.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmitFromStorageHandler.cs new file mode 100644 index 000000000000..51909d7e8853 --- /dev/null +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TransmitFromStorageHandler.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Timers; +using OpenTelemetry; +using OpenTelemetry.Extensions.PersistentStorage.Abstractions; + +namespace Azure.Monitor.OpenTelemetry.Exporter.Internals +{ + internal class TransmitFromStorageHandler : IDisposable + { + private readonly ApplicationInsightsRestClient _applicationInsightsRestClient; + private readonly PersistentBlobProvider _blobProvider; + private readonly TransmissionStateManager _transmissionStateManager; + private readonly System.Timers.Timer _transmitFromStorageTimer; + private bool _disposed; + + internal TransmitFromStorageHandler(ApplicationInsightsRestClient applicationInsightsRestClient, PersistentBlobProvider blobProvider, TransmissionStateManager transmissionStateManager) + { + _applicationInsightsRestClient = applicationInsightsRestClient; + _blobProvider = blobProvider; + _transmissionStateManager = transmissionStateManager; + _transmitFromStorageTimer = new System.Timers.Timer(); + _transmitFromStorageTimer.Elapsed += TransmitFromStorage; + _transmitFromStorageTimer.AutoReset = true; + _transmitFromStorageTimer.Interval = 120000; + _transmitFromStorageTimer.Start(); + } + + internal void TransmitFromStorage(object? sender, ElapsedEventArgs? e) + { + // Only proces 10 files at a time so that we don't end up taking lot of cpu + // if the number of files are large. + int fileCount = 10; + while (fileCount > 0) + { + if (_transmissionStateManager.State == TransmissionState.Closed && _blobProvider.TryGetBlob(out var blob) && blob.TryLease(120000)) + { + try + { + blob.TryRead(out var data); + + using var httpMessage = _applicationInsightsRestClient.InternalTrackAsync(data, CancellationToken.None).Result; + var result = HttpPipelineHelper.IsSuccess(httpMessage); + + if (result == ExportResult.Success) + { + _transmissionStateManager.ResetConsecutiveErrors(); + _transmissionStateManager.CloseTransmission(); + + AzureMonitorExporterEventSource.Log.WriteInformational("TransmitFromStorageSuccess", "Successfully transmitted a blob from storage."); + + // In case if the delete fails, there is a possibility + // that the current batch will be transmitted more than once resulting in duplicates. + blob.TryDelete(); + } + else + { + _transmissionStateManager.EnableBackOff(httpMessage.Response); + HttpPipelineHelper.HandleFailures(httpMessage, blob, _blobProvider); + break; + } + } + catch (Exception ex) + { + AzureMonitorExporterEventSource.Log.WriteError("FailedToTransmitFromStorage", ex); + } + } + else + { + break; + } + + fileCount--; + } + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _transmitFromStorageTimer?.Dispose(); + } + + _disposed = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/OfflineStorageTests.cs b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/OfflineStorageTests.cs index 00779ed3e088..cafbe53d2d1c 100644 --- a/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/OfflineStorageTests.cs +++ b/sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Azure.Monitor.OpenTelemetry.Exporter.Tests/OfflineStorageTests.cs @@ -159,7 +159,7 @@ public void TransmitFromStorage() // reset server logic to return 200 mockResponse = new MockResponse(200).SetContent("{\"itemsReceived\": 1,\"itemsAccepted\": 1,\"errors\":[]}"); - transmitter = GetTransmitter(mockResponse); + transmitter = GetTransmitter(new[] { mockResponse }); transmitter._fileBlobProvider = mockFileProvider; transmitter.TransmitFromStorage(1, false, CancellationToken.None).EnsureCompleted(); @@ -169,7 +169,108 @@ public void TransmitFromStorage() Assert.Empty(transmitter._fileBlobProvider.GetBlobs()); } - private static AzureMonitorTransmitter GetTransmitter(MockResponse mockResponse) + // TODO: Remove TransmitFromStorage() test after moving to new implementation + [Fact] + public void TelemetryIsTransmittedSuccessfullyFromStorage() + { + using var activity = CreateActivity("TestActivity"); + var telemetryItem = CreateTelemetryItem(activity); + List telemetryItems = new List(); + telemetryItems.Add(telemetryItem); + + // Transmit + var mockResponseError = new MockResponse(500).SetContent("Internal Server Error"); + var mockResponseSuccess = new MockResponse(200).SetContent("{\"itemsReceived\": 1,\"itemsAccepted\": 1,\"errors\":[]}"); + var transmitter = GetTransmitter(mockResponseError, mockResponseSuccess); + + transmitter.TrackAsync(telemetryItems, false, CancellationToken.None).EnsureCompleted(); + + //Assert + Assert.NotNull(transmitter._fileBlobProvider); + Assert.Single(transmitter._fileBlobProvider.GetBlobs()); + + var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, new TransmissionStateManager()); + transmitFromStorageHandler.TransmitFromStorage(null, null); + + // Assert + // Blob will be deleted on successful transmission + Assert.Empty(transmitter._fileBlobProvider.GetBlobs()); + + transmitter.Dispose(); + } + + [Fact] + public void TelemetryIsNotTransmittedWhenTransmissionStateIsOpen() + { + using var activity = CreateActivity("TestActivity"); + var telemetryItem = CreateTelemetryItem(activity); + List telemetryItems = new List(); + telemetryItems.Add(telemetryItem); + + // Transmit + var mockResponseError = new MockResponse(500).SetContent("Internal Server Error"); + var mockResponseSuccess = new MockResponse(200).SetContent("{\"itemsReceived\": 1,\"itemsAccepted\": 1,\"errors\":[]}"); + var transmitter = GetTransmitter(mockResponseError, mockResponseSuccess); + + transmitter.TrackAsync(telemetryItems, false, CancellationToken.None).EnsureCompleted(); + + //Assert + Assert.NotNull(transmitter._fileBlobProvider); + Assert.Single(transmitter._fileBlobProvider.GetBlobs()); + + var transmissionStateManager = new TransmissionStateManager( + random: new(), + minIntervalToUpdateConsecutiveErrors: TimeSpan.FromSeconds(20), + nextMinTimeToUpdateConsecutiveErrors: DateTimeOffset.MinValue, + backOffIntervalTimer: new System.Timers.Timer(), + TransmissionState.Open + ); + + Assert.Equal(TransmissionState.Open, transmissionStateManager.State); + + var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, transmissionStateManager); + transmitFromStorageHandler.TransmitFromStorage(null, null); + + // Assert + // Blob will not be deleted as the transmission state is open. + Assert.Single(transmitter._fileBlobProvider.GetBlobs()); + + transmitter.Dispose(); + } + + [Fact] + public void TransmissionStateIsSetToOpenOnFailedRequest() + { + using var activity = CreateActivity("TestActivity"); + var telemetryItem = CreateTelemetryItem(activity); + List telemetryItems = new List(); + telemetryItems.Add(telemetryItem); +; + // Transmit + var mockResponse = new MockResponse(500).SetContent("Internal Server Error"); + var transmitter = GetTransmitter(mockResponse, mockResponse); + transmitter.TrackAsync(telemetryItems, false, CancellationToken.None).EnsureCompleted(); + + //Assert + Assert.NotNull(transmitter._fileBlobProvider); + Assert.Single(transmitter._fileBlobProvider.GetBlobs()); + + var transmissionStateManager = new TransmissionStateManager(); + var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, transmissionStateManager); + + Assert.Equal(TransmissionState.Closed, transmissionStateManager.State); + + transmitFromStorageHandler.TransmitFromStorage(null, null); + + // Assert + // Blob will not be deleted as the transmission state is open. + Assert.Equal(TransmissionState.Open, transmissionStateManager.State); + Assert.Single(transmitter._fileBlobProvider.GetBlobs()); + + transmitter.Dispose(); + } + + private static AzureMonitorTransmitter GetTransmitter(params MockResponse[] mockResponse) { MockTransport mockTransport = new MockTransport(mockResponse); AzureMonitorExporterOptions options = new AzureMonitorExporterOptions