Skip to content

Commit

Permalink
Client Telemetry: Adds functionality to collect telemetry for collect…
Browse files Browse the repository at this point in the history
…ion cache (#3484)

* code refcator

* collection cahe changes

* collect telemetry of collction cache

* remove compilation error

* cleint telemetry models

* first draft

* null check

* test fix

* fix tests

* test fix

* move AzureVMMetadata to models folder

* remove useless import

* refactor code

* refactor

* fix test

* fix tests

* review comnts

* add db information

* fix test

* revert VM initilization from constructor to request path

* fix test

* review comments

* diagnostic handler null assignment

* fix test

* minor change

Co-authored-by: Sourabh Jain <[email protected]>
  • Loading branch information
sourabh1007 and sourabh1007 authored Oct 17, 2022
1 parent 63a846d commit cd871a2
Show file tree
Hide file tree
Showing 33 changed files with 480 additions and 191 deletions.
70 changes: 65 additions & 5 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos
using global::Azure.Core;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Handler;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
Expand Down Expand Up @@ -132,11 +133,13 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private int rntbdSendHangDetectionTimeSeconds = DefaultRntbdSendHangDetectionTimeSeconds;
private bool enableCpuMonitor = DefaultEnableCpuMonitor;
private int rntbdMaxConcurrentOpeningConnectionCount = 5;
private string clientId;

//Consistency
private Documents.ConsistencyLevel? desiredConsistencyLevel;

internal CosmosAccountServiceConfiguration accountServiceConfiguration { get; private set; }
internal ClientTelemetry clientTelemetry { get; set; }

private ClientCollectionCache collectionCache;

Expand Down Expand Up @@ -417,6 +420,7 @@ internal DocumentClient(Uri serviceEndpoint,
/// <param name="transportClientHandlerFactory">Transport client handler factory.</param>
/// <param name="storeClientFactory">Factory that creates store clients sharing the same transport client to optimize network resource reuse across multiple document clients in the same process.</param>
/// <param name="isLocalQuorumConsistency">Flag to allow Quorum Read with Eventual Consistency Account</param>
/// <param name="cosmosClientId"></param>
/// <remarks>
/// The service endpoint can be obtained from the Azure Management Portal.
/// If you are connecting using one of the Master Keys, these can be obtained along with the endpoint from the Azure Management Portal
Expand All @@ -441,7 +445,8 @@ internal DocumentClient(Uri serviceEndpoint,
bool? enableCpuMonitor = null,
Func<TransportClient, TransportClient> transportClientHandlerFactory = null,
IStoreClientFactory storeClientFactory = null,
bool isLocalQuorumConsistency = false)
bool isLocalQuorumConsistency = false,
string cosmosClientId = null)
{
if (sendingRequestEventArgs != null)
{
Expand Down Expand Up @@ -472,7 +477,8 @@ internal DocumentClient(Uri serviceEndpoint,
handler: handler,
sessionContainer: sessionContainer,
enableCpuMonitor: enableCpuMonitor,
storeClientFactory: storeClientFactory);
storeClientFactory: storeClientFactory,
cosmosClientId: cosmosClientId);
}

/// <summary>
Expand Down Expand Up @@ -634,7 +640,12 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
catch (DocumentClientException ex)
{
// Clear the caches to ensure that we don't have partial results
this.collectionCache = new ClientCollectionCache(this.sessionContainer, this.GatewayStoreModel, this, this.retryPolicy);
this.collectionCache = new ClientCollectionCache(
sessionContainer: this.sessionContainer,
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
clientTelemetry: this.clientTelemetry);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
Expand All @@ -648,13 +659,16 @@ internal virtual void Initialize(Uri serviceEndpoint,
ISessionContainer sessionContainer = null,
bool? enableCpuMonitor = null,
IStoreClientFactory storeClientFactory = null,
TokenCredential tokenCredential = null)
TokenCredential tokenCredential = null,
string cosmosClientId = null)
{
if (serviceEndpoint == null)
{
throw new ArgumentNullException("serviceEndpoint");
}

this.clientId = cosmosClientId;

this.queryPartitionProvider = new AsyncLazy<QueryPartitionProvider>(async () =>
{
await this.EnsureValidClientAsync(NoOpTrace.Singleton);
Expand Down Expand Up @@ -928,6 +942,12 @@ internal virtual void Initialize(Uri serviceEndpoint,
// For direct: WFStoreProxy [set in OpenAsync()].
this.eventSource = DocumentClientEventSource.Instance;

// Disable system usage for internal builds. Cosmos DB owns the VMs and already logs
// the system information so no need to track it.
#if !INTERNAL
this.InitializeClientTelemetry();
#endif

this.initializeTaskFactory = (_) => TaskHelper.InlineIfPossible<bool>(
() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory),
new ResourceThrottleRetryPolicy(
Expand Down Expand Up @@ -984,7 +1004,12 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli

this.GatewayStoreModel = gatewayStoreModel;

this.collectionCache = new ClientCollectionCache(this.sessionContainer, this.GatewayStoreModel, this, this.retryPolicy);
this.collectionCache = new ClientCollectionCache(
sessionContainer: this.sessionContainer,
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
clientTelemetry: this.clientTelemetry);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

Expand All @@ -1002,6 +1027,36 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
return true;
}

private void InitializeClientTelemetry()
{
if (this.ConnectionPolicy.EnableClientTelemetry)
{
try
{
this.clientTelemetry = ClientTelemetry.CreateAndStartBackgroundTelemetry(
clientId: this.clientId,
httpClient: this.httpClient,
userAgent: this.ConnectionPolicy.UserAgentContainer.UserAgent,
connectionMode: this.ConnectionPolicy.ConnectionMode,
authorizationTokenProvider: this.cosmosAuthorization,
diagnosticsHelper: DiagnosticsHandlerHelper.Instance,
preferredRegions: this.ConnectionPolicy.PreferredLocations,
globalEndpointManager: this.GlobalEndpointManager);

DefaultTrace.TraceInformation("Client Telemetry Enabled.");
}
catch (Exception ex)
{
DefaultTrace.TraceInformation($"Error While starting Telemetry Job : {ex.Message}. Hence disabling Client Telemetry");
this.ConnectionPolicy.EnableClientTelemetry = false;
}
}
else
{
DefaultTrace.TraceInformation("Client Telemetry Disabled.");
}
}

private async Task InitializeCachesAsync(string databaseName, DocumentCollection collection, CancellationToken cancellationToken)
{
if (databaseName == null)
Expand Down Expand Up @@ -1272,6 +1327,11 @@ public void Dispose()
this.initTaskCache = null;
}

if (this.clientTelemetry != null)
{
this.clientTelemetry.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down
8 changes: 3 additions & 5 deletions Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,14 @@ public ClientPipelineBuilder(
#if !INTERNAL
this.diagnosticsHandler = new DiagnosticsHandler();
Debug.Assert(this.diagnosticsHandler.InnerHandler == null, nameof(this.diagnosticsHandler));

#else
this.diagnosticsHandler = null;
#endif
if (telemetry != null)
{
this.telemetryHandler = new TelemetryHandler(telemetry);
Debug.Assert(this.telemetryHandler.InnerHandler == null, nameof(this.telemetryHandler));
}
#else
this.diagnosticsHandler = null;
this.telemetryHandler = null;
#endif

this.UseRetryPolicy();
this.AddCustomHandlers(customHandlers);
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override async Task<ResponseMessage> SendAsync(
try
{
this.telemetry
.Collect(
.CollectOperationInfo(
cosmosDiagnostics: response.Diagnostics,
statusCode: response.StatusCode,
responseSizeInBytes: this.GetPayloadSize(response),
Expand All @@ -40,7 +40,7 @@ public override async Task<ResponseMessage> SendAsync(
resourceType: request.ResourceType,
consistencyLevel: request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
requestCharge: response.Headers.RequestCharge,
subStatusCode: response.Headers.SubStatusCodeLiteral);
subStatusCode: response.Headers.SubStatusCode);
}
catch (Exception ex)
{
Expand Down
38 changes: 4 additions & 34 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ internal class ClientContextCore : CosmosClientContext
private readonly CosmosResponseFactoryInternal responseFactory;
private readonly RequestInvokerHandler requestHandler;
private readonly CosmosClientOptions clientOptions;
private readonly ClientTelemetry telemetry;

private readonly string userAgent;
private bool isDisposed = false;
Expand All @@ -53,7 +52,6 @@ private ClientContextCore(
this.documentClient = documentClient;
this.userAgent = userAgent;
this.batchExecutorCache = batchExecutorCache;
this.telemetry = telemetry;
}

internal static CosmosClientContext Create(
Expand Down Expand Up @@ -81,7 +79,8 @@ internal static CosmosClientContext Create(
storeClientFactory: clientOptions.StoreClientFactory,
desiredConsistencyLevel: clientOptions.GetDocumentsConsistencyLevel(),
handler: httpMessageHandler,
sessionContainer: clientOptions.SessionContainer);
sessionContainer: clientOptions.SessionContainer,
cosmosClientId: cosmosClient.Id);

return ClientContextCore.Create(
cosmosClient,
Expand All @@ -107,42 +106,14 @@ internal static CosmosClientContext Create(

clientOptions = ClientContextCore.CreateOrCloneClientOptions(clientOptions);

ConnectionPolicy connectionPolicy = clientOptions.GetConnectionPolicy(cosmosClient.ClientId);
ClientTelemetry telemetry = null;
if (connectionPolicy.EnableClientTelemetry)
{
try
{
telemetry = ClientTelemetry.CreateAndStartBackgroundTelemetry(
clientId: cosmosClient.Id,
documentClient: documentClient,
userAgent: connectionPolicy.UserAgentContainer.UserAgent,
connectionMode: connectionPolicy.ConnectionMode,
authorizationTokenProvider: cosmosClient.AuthorizationTokenProvider,
diagnosticsHelper: DiagnosticsHandlerHelper.Instance,
preferredRegions: clientOptions.ApplicationPreferredRegions);

}
catch (Exception ex)
{
DefaultTrace.TraceInformation($"Error While starting Telemetry Job : {ex.Message}. Hence disabling Client Telemetry");
connectionPolicy.EnableClientTelemetry = false;
}

}
else
{
DefaultTrace.TraceInformation("Client Telemetry Disabled.");
}

if (requestInvokerHandler == null)
{
//Request pipeline
ClientPipelineBuilder clientPipelineBuilder = new ClientPipelineBuilder(
cosmosClient,
clientOptions.ConsistencyLevel,
clientOptions.CustomHandlers,
telemetry: telemetry);
telemetry: documentClient.clientTelemetry);

requestInvokerHandler = clientPipelineBuilder.Build();
}
Expand All @@ -165,7 +136,7 @@ internal static CosmosClientContext Create(
documentClient: documentClient,
userAgent: documentClient.ConnectionPolicy.UserAgentContainer.UserAgent,
batchExecutorCache: new BatchAsyncContainerExecutorCache(),
telemetry: telemetry);
telemetry: documentClient.clientTelemetry);
}

/// <summary>
Expand Down Expand Up @@ -458,7 +429,6 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.telemetry?.Dispose();
this.batchExecutorCache.Dispose();
this.DocumentClient.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ internal static void RecordOtelAttributes(CosmosException exception, DiagnosticS
{
scope.AddAttribute(OpenTelemetryAttributeKeys.StatusCode, exception.StatusCode);
scope.AddAttribute(OpenTelemetryAttributeKeys.RequestCharge, exception.RequestCharge);
scope.AddAttribute(OpenTelemetryAttributeKeys.Region, ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics));
scope.AddAttribute(OpenTelemetryAttributeKeys.Region,
ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics?.GetContactedRegions()));
scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionMessage, exception.Message);

CosmosDbEventSource.RecordDiagnosticsForExceptions(exception.Diagnostics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public override string ToString()

internal static void RecordOtelAttributes(CosmosNullReferenceException exception, DiagnosticScope scope)
{
scope.AddAttribute(OpenTelemetryAttributeKeys.Region, ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics));
scope.AddAttribute(OpenTelemetryAttributeKeys.Region,
ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics?.GetContactedRegions()));
scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionMessage, exception.GetBaseException().Message);

CosmosDbEventSource.RecordDiagnosticsForExceptions(exception.Diagnostics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public override string ToString()
/// <param name="scope"></param>
internal static void RecordOtelAttributes(CosmosObjectDisposedException exception, DiagnosticScope scope)
{
scope.AddAttribute(OpenTelemetryAttributeKeys.Region, ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics));
scope.AddAttribute(OpenTelemetryAttributeKeys.Region,
ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics?.GetContactedRegions()));
scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionMessage, exception.Message);

CosmosDbEventSource.RecordDiagnosticsForExceptions(exception.Diagnostics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ private Lazy<string> CreateToStringMessage()
/// <param name="scope"></param>
internal static void RecordOtelAttributes(CosmosOperationCanceledException exception, DiagnosticScope scope)
{
scope.AddAttribute(OpenTelemetryAttributeKeys.Region, ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics));
scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionMessage, exception.GetBaseException().Message);
scope.AddAttribute(OpenTelemetryAttributeKeys.Region,
ClientTelemetryHelper.GetContactedRegions(exception.Diagnostics?.GetContactedRegions()));
scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionMessage,
exception.GetBaseException().Message);

CosmosDbEventSource.RecordDiagnosticsForExceptions(exception.Diagnostics);
}
Expand Down
Loading

0 comments on commit cd871a2

Please sign in to comment.