Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Azure.Monitor.OpenTelemetry.Exporter.Internals
/// </summary>
internal class AzureMonitorTransmitter : ITransmitter
{
private readonly ApplicationInsightsRestClient _applicationInsightsRestClient;
internal readonly ApplicationInsightsRestClient _applicationInsightsRestClient;
Comment thread
vishweshbankwar marked this conversation as resolved.
internal PersistentBlobProvider? _fileBlobProvider;
private readonly AzureMonitorStatsbeat? _statsbeat;
private readonly ConnectionVars _connectionVars;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

using Azure.Core;
using Azure.Monitor.OpenTelemetry.Exporter.Models;
using OpenTelemetry;
using OpenTelemetry.Extensions.PersistentStorage.Abstractions;
using Azure.Monitor.OpenTelemetry.Exporter.Internals.PersistentStorage;

namespace Azure.Monitor.OpenTelemetry.Exporter.Internals
{
Expand Down Expand Up @@ -126,5 +129,63 @@ internal static bool TryGetRetryIntervalTimespan(Response? httpResponse, out Tim

return Encoding.UTF8.GetBytes(partialContent);
}

internal static ExportResult IsSuccess(HttpMessage httpMessage)
{
if (httpMessage.HasResponse && httpMessage.Response.Status == ResponseStatusCodes.Success)
{
return ExportResult.Success;
}

return ExportResult.Failure;
}

internal static void HandleFailures(HttpMessage httpMessage, PersistentBlob blob, PersistentBlobProvider blobProvider)
{
int statusCode = 0;
bool shouldRetry = true;

if (httpMessage.HasResponse)
{
statusCode = httpMessage.Response.Status;
switch (statusCode)
{
case ResponseStatusCodes.PartialSuccess:
// Parse retry-after header
// Send Failed Messages To Storage
Comment thread
rajkumar-rangaraj marked this conversation as resolved.
// Delete existing file
TrackResponse trackResponse = GetTrackResponse(httpMessage);
var content = GetPartialContentForRetry(trackResponse, httpMessage.Request.Content);
if (content != null)
{
blob.TryDelete();
blobProvider.SaveTelemetry(content);
}
break;
case ResponseStatusCodes.RequestTimeout:
case ResponseStatusCodes.ResponseCodeTooManyRequests:
case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache:
case ResponseStatusCodes.Unauthorized:
case ResponseStatusCodes.Forbidden:
case ResponseStatusCodes.InternalServerError:
case ResponseStatusCodes.BadGateway:
case ResponseStatusCodes.ServiceUnavailable:
case ResponseStatusCodes.GatewayTimeout:
break;
default:
shouldRetry = false;
break;
}
}

if (shouldRetry)
{
AzureMonitorExporterEventSource.Log.WriteWarning("FailedToTransmitFromStorage", $"Error code is {statusCode}: Telemetry is stored offline for retry");
}
else
{
AzureMonitorExporterEventSource.Log.WriteWarning("FailedToTransmitFromStorage", $"Error code is {statusCode}: Telemetry is dropped");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ internal static ExportResult SaveTelemetry(this PersistentBlobProvider storage,
{
return storage.TryCreateBlob(content, leaseTime, out _) ? ExportResult.Success : ExportResult.Failure;
}

internal static ExportResult SaveTelemetry(this PersistentBlobProvider storage, byte[] content)
{
return storage.TryCreateBlob(content, out _) ? ExportResult.Success : ExportResult.Failure;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Timers;
using OpenTelemetry;
using OpenTelemetry.Extensions.PersistentStorage.Abstractions;

namespace Azure.Monitor.OpenTelemetry.Exporter.Internals
{
internal class TransmitFromStorageHandler : IDisposable
{
private readonly ApplicationInsightsRestClient _applicationInsightsRestClient;
private readonly PersistentBlobProvider _blobProvider;
private readonly TransmissionStateManager _transmissionStateManager;
private readonly System.Timers.Timer _transmitFromStorageTimer;
private bool _disposed;

internal TransmitFromStorageHandler(ApplicationInsightsRestClient applicationInsightsRestClient, PersistentBlobProvider blobProvider, TransmissionStateManager transmissionStateManager)
{
_applicationInsightsRestClient = applicationInsightsRestClient;
_blobProvider = blobProvider;
_transmissionStateManager = transmissionStateManager;
_transmitFromStorageTimer = new System.Timers.Timer();
_transmitFromStorageTimer.Elapsed += TransmitFromStorage;
_transmitFromStorageTimer.AutoReset = true;
_transmitFromStorageTimer.Interval = 120000;
Comment thread
rajkumar-rangaraj marked this conversation as resolved.
_transmitFromStorageTimer.Start();
}

internal void TransmitFromStorage(object? sender, ElapsedEventArgs? e)
{
// Only proces 10 files at a time so that we don't end up taking lot of cpu
// if the number of files are large.
int fileCount = 10;
while (fileCount > 0)
{
if (_transmissionStateManager.State == TransmissionState.Closed && _blobProvider.TryGetBlob(out var blob) && blob.TryLease(120000))
{
try
{
blob.TryRead(out var data);

using var httpMessage = _applicationInsightsRestClient.InternalTrackAsync(data, CancellationToken.None).Result;
var result = HttpPipelineHelper.IsSuccess(httpMessage);

if (result == ExportResult.Success)
{
_transmissionStateManager.ResetConsecutiveErrors();
_transmissionStateManager.CloseTransmission();

AzureMonitorExporterEventSource.Log.WriteInformational("TransmitFromStorageSuccess", "Successfully transmitted a blob from storage.");

// In case if the delete fails, there is a possibility
// that the current batch will be transmitted more than once resulting in duplicates.
blob.TryDelete();
}
else
{
_transmissionStateManager.EnableBackOff(httpMessage.Response);
HttpPipelineHelper.HandleFailures(httpMessage, blob, _blobProvider);
break;
}
}
catch (Exception ex)
{
AzureMonitorExporterEventSource.Log.WriteError("FailedToTransmitFromStorage", ex);
}
}
else
{
break;
}

fileCount--;
}
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_transmitFromStorageTimer?.Dispose();
}

_disposed = true;
}
}

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void TransmitFromStorage()

// reset server logic to return 200
mockResponse = new MockResponse(200).SetContent("{\"itemsReceived\": 1,\"itemsAccepted\": 1,\"errors\":[]}");
transmitter = GetTransmitter(mockResponse);
transmitter = GetTransmitter(new[] { mockResponse });
transmitter._fileBlobProvider = mockFileProvider;

transmitter.TransmitFromStorage(1, false, CancellationToken.None).EnsureCompleted();
Expand All @@ -169,7 +169,108 @@ public void TransmitFromStorage()
Assert.Empty(transmitter._fileBlobProvider.GetBlobs());
}

private static AzureMonitorTransmitter GetTransmitter(MockResponse mockResponse)
// TODO: Remove TransmitFromStorage() test after moving to new implementation
Comment thread
vishweshbankwar marked this conversation as resolved.
[Fact]
public void TelemetryIsTransmittedSuccessfullyFromStorage()
{
using var activity = CreateActivity("TestActivity");
var telemetryItem = CreateTelemetryItem(activity);
List<TelemetryItem> telemetryItems = new List<TelemetryItem>();
telemetryItems.Add(telemetryItem);

// Transmit
var mockResponseError = new MockResponse(500).SetContent("Internal Server Error");
var mockResponseSuccess = new MockResponse(200).SetContent("{\"itemsReceived\": 1,\"itemsAccepted\": 1,\"errors\":[]}");
var transmitter = GetTransmitter(mockResponseError, mockResponseSuccess);

transmitter.TrackAsync(telemetryItems, false, CancellationToken.None).EnsureCompleted();

//Assert
Assert.NotNull(transmitter._fileBlobProvider);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, new TransmissionStateManager());
transmitFromStorageHandler.TransmitFromStorage(null, null);

// Assert
// Blob will be deleted on successful transmission
Assert.Empty(transmitter._fileBlobProvider.GetBlobs());

transmitter.Dispose();
}

[Fact]
public void TelemetryIsNotTransmittedWhenTransmissionStateIsOpen()
{
using var activity = CreateActivity("TestActivity");
var telemetryItem = CreateTelemetryItem(activity);
List<TelemetryItem> telemetryItems = new List<TelemetryItem>();
telemetryItems.Add(telemetryItem);

// Transmit
var mockResponseError = new MockResponse(500).SetContent("Internal Server Error");
var mockResponseSuccess = new MockResponse(200).SetContent("{\"itemsReceived\": 1,\"itemsAccepted\": 1,\"errors\":[]}");
var transmitter = GetTransmitter(mockResponseError, mockResponseSuccess);

transmitter.TrackAsync(telemetryItems, false, CancellationToken.None).EnsureCompleted();

//Assert
Assert.NotNull(transmitter._fileBlobProvider);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

var transmissionStateManager = new TransmissionStateManager(
random: new(),
minIntervalToUpdateConsecutiveErrors: TimeSpan.FromSeconds(20),
nextMinTimeToUpdateConsecutiveErrors: DateTimeOffset.MinValue,
backOffIntervalTimer: new System.Timers.Timer(),
TransmissionState.Open
);

Assert.Equal(TransmissionState.Open, transmissionStateManager.State);

var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, transmissionStateManager);
transmitFromStorageHandler.TransmitFromStorage(null, null);

// Assert
// Blob will not be deleted as the transmission state is open.
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

transmitter.Dispose();
}

[Fact]
public void TransmissionStateIsSetToOpenOnFailedRequest()
{
using var activity = CreateActivity("TestActivity");
var telemetryItem = CreateTelemetryItem(activity);
List<TelemetryItem> telemetryItems = new List<TelemetryItem>();
telemetryItems.Add(telemetryItem);
;
// Transmit
var mockResponse = new MockResponse(500).SetContent("Internal Server Error");
var transmitter = GetTransmitter(mockResponse, mockResponse);
transmitter.TrackAsync(telemetryItems, false, CancellationToken.None).EnsureCompleted();

//Assert
Assert.NotNull(transmitter._fileBlobProvider);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

var transmissionStateManager = new TransmissionStateManager();
var transmitFromStorageHandler = new TransmitFromStorageHandler(transmitter._applicationInsightsRestClient, transmitter._fileBlobProvider, transmissionStateManager);

Assert.Equal(TransmissionState.Closed, transmissionStateManager.State);

transmitFromStorageHandler.TransmitFromStorage(null, null);

// Assert
// Blob will not be deleted as the transmission state is open.
Assert.Equal(TransmissionState.Open, transmissionStateManager.State);
Assert.Single(transmitter._fileBlobProvider.GetBlobs());

transmitter.Dispose();
}

private static AzureMonitorTransmitter GetTransmitter(params MockResponse[] mockResponse)
{
MockTransport mockTransport = new MockTransport(mockResponse);
AzureMonitorExporterOptions options = new AzureMonitorExporterOptions
Expand Down