Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 21 additions & 46 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private Documents.ConsistencyLevel? desiredConsistencyLevel;

internal CosmosAccountServiceConfiguration accountServiceConfiguration { get; private set; }
internal ClientTelemetry clientTelemetry { get; set; }

internal TelemetryToServiceHelper telemetryToServiceHelper { get; set; }

private ClientCollectionCache collectionCache;

Expand Down Expand Up @@ -564,11 +565,11 @@ public DocumentClient(Uri serviceEndpoint,
/// <summary>
/// Internal constructor purely for unit-testing
/// </summary>
internal DocumentClient(Uri serviceEndpoint, string authKey)
internal DocumentClient(Uri serviceEndpoint, ConnectionPolicy connectionPolicy)
{
// do nothing
this.ServiceEndpoint = serviceEndpoint;
this.ConnectionPolicy = new ConnectionPolicy();
this.ConnectionPolicy = connectionPolicy ?? new ConnectionPolicy();
}

internal virtual async Task<ClientCollectionCache> GetCollectionCacheAsync(ITrace trace)
Expand Down Expand Up @@ -660,7 +661,7 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
clientTelemetry: this.clientTelemetry);
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
Expand Down Expand Up @@ -939,6 +940,15 @@ internal virtual void Initialize(Uri serviceEndpoint,
// Loading VM Information (non blocking call and initialization won't fail if this call fails)
VmMetadataApiHandler.TryInitialize(this.httpClient);

// Starting ClientTelemetry Job
this.telemetryToServiceHelper = TelemetryToServiceHelper.CreateAndInitializeClientConfigAndTelemetryJob(this.clientId,
this.ConnectionPolicy,
this.cosmosAuthorization,
this.httpClient,
this.ServiceEndpoint,
this.GlobalEndpointManager,
this.cancellationTokenSource);

if (sessionContainer != null)
{
this.sessionContainer = sessionContainer;
Expand All @@ -961,12 +971,6 @@ internal virtual void Initialize(Uri serviceEndpoint,
// For direct: WFStoreProxy [set in OpenAsync()].
this.eventSource = DocumentClientEventSource.Instance;

// Disable system usage for internal builds. Cosmos DB owns the VMs and already logs
// the system information so no need to track it.
#if !INTERNAL
this.InitializeClientTelemetry();
#endif

this.initializeTaskFactory = (_) => TaskHelper.InlineIfPossible<bool>(
() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory),
new ResourceThrottleRetryPolicy(
Expand Down Expand Up @@ -1028,7 +1032,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
clientTelemetry: this.clientTelemetry);
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

Expand All @@ -1046,36 +1050,6 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
return true;
}

private void InitializeClientTelemetry()
{
if (this.ConnectionPolicy.EnableClientTelemetry)
{
try
{
this.clientTelemetry = ClientTelemetry.CreateAndStartBackgroundTelemetry(
clientId: this.clientId,
httpClient: this.httpClient,
userAgent: this.ConnectionPolicy.UserAgentContainer.BaseUserAgent,
connectionMode: this.ConnectionPolicy.ConnectionMode,
authorizationTokenProvider: this.cosmosAuthorization,
diagnosticsHelper: DiagnosticsHandlerHelper.Instance,
preferredRegions: this.ConnectionPolicy.PreferredLocations,
globalEndpointManager: this.GlobalEndpointManager);

DefaultTrace.TraceInformation("Client Telemetry Enabled.");
}
catch (Exception ex)
{
DefaultTrace.TraceInformation($"Error While starting Telemetry Job : {ex.Message}. Hence disabling Client Telemetry");
this.ConnectionPolicy.EnableClientTelemetry = false;
}
}
else
{
DefaultTrace.TraceInformation("Client Telemetry Disabled.");
}
}

private async Task InitializeCachesAsync(string databaseName, DocumentCollection collection, CancellationToken cancellationToken)
{
if (databaseName == null)
Expand Down Expand Up @@ -1279,6 +1253,12 @@ public void Dispose()
return;
}

if (this.telemetryToServiceHelper != null)
{
this.telemetryToServiceHelper.Dispose();
this.telemetryToServiceHelper = null;
}

if (!this.cancellationTokenSource.IsCancellationRequested)
{
this.cancellationTokenSource.Cancel();
Expand Down Expand Up @@ -1346,11 +1326,6 @@ public void Dispose()
this.initTaskCache = null;
}

if (this.clientTelemetry != null)
{
this.clientTelemetry.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Microsoft.Azure.Cosmos.Fluent
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Telemetry;

/// <summary>
/// This is a Builder class that creates a cosmos client
Expand Down
9 changes: 3 additions & 6 deletions Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ClientPipelineBuilder(
CosmosClient client,
ConsistencyLevel? requestedClientConsistencyLevel,
IReadOnlyCollection<RequestHandler> customHandlers,
ClientTelemetry telemetry)
TelemetryToServiceHelper telemetryToServiceHelper)
{
this.client = client ?? throw new ArgumentNullException(nameof(client));
this.requestedClientConsistencyLevel = requestedClientConsistencyLevel;
Expand All @@ -48,11 +48,8 @@ public ClientPipelineBuilder(
#else
this.diagnosticsHandler = null;
#endif
if (telemetry != null)
{
this.telemetryHandler = new TelemetryHandler(telemetry);
Debug.Assert(this.telemetryHandler.InnerHandler == null, nameof(this.telemetryHandler));
}
this.telemetryHandler = new TelemetryHandler(telemetryToServiceHelper);
Debug.Assert(this.telemetryHandler.InnerHandler == null, nameof(this.telemetryHandler));

this.UseRetryPolicy();
this.AddCustomHandlers(customHandlers);
Expand Down
38 changes: 21 additions & 17 deletions Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ namespace Microsoft.Azure.Cosmos.Handlers
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Telemetry.Collector;

internal class TelemetryHandler : RequestHandler
{
private readonly ClientTelemetry telemetry;
private readonly TelemetryToServiceHelper telemetryToServiceHelper;

public TelemetryHandler(ClientTelemetry telemetry)
public TelemetryHandler(TelemetryToServiceHelper telemetryToServiceHelper)
{
this.telemetry = telemetry ?? throw new ArgumentNullException(nameof(telemetry));
this.telemetryToServiceHelper = telemetryToServiceHelper ?? throw new ArgumentNullException(nameof(telemetryToServiceHelper));
}

public override async Task<ResponseMessage> SendAsync(
Expand All @@ -29,19 +30,22 @@ public override async Task<ResponseMessage> SendAsync(
{
try
{
this.telemetry
.CollectOperationInfo(
cosmosDiagnostics: response.Diagnostics,
statusCode: response.StatusCode,
responseSizeInBytes: this.GetPayloadSize(response),
containerId: request.ContainerId,
databaseId: request.DatabaseId,
operationType: request.OperationType,
resourceType: request.ResourceType,
consistencyLevel: request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
requestCharge: response.Headers.RequestCharge,
subStatusCode: response.Headers.SubStatusCode,
trace: response.Trace);
this.telemetryToServiceHelper.GetCollector().CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace
});
}
catch (Exception ex)
{
Expand All @@ -63,7 +67,7 @@ private bool IsAllowed(RequestMessage request)
/// </summary>
/// <param name="response"></param>
/// <returns>Size of Payload</returns>
private long GetPayloadSize(ResponseMessage response)
private static long GetPayloadSize(ResponseMessage response)
{
if (response != null)
{
Expand Down
4 changes: 1 addition & 3 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ namespace Microsoft.Azure.Cosmos
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Handler;
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Routing;
Expand Down Expand Up @@ -122,7 +120,7 @@ internal static CosmosClientContext Create(
cosmosClient,
clientOptions.ConsistencyLevel,
clientOptions.CustomHandlers,
telemetry: documentClient.clientTelemetry);
telemetryToServiceHelper: documentClient.telemetryToServiceHelper);

requestInvokerHandler = clientPipelineBuilder.Build();
}
Expand Down
42 changes: 17 additions & 25 deletions Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Telemetry.Collector;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand All @@ -25,20 +26,20 @@ internal class ClientCollectionCache : CollectionCache
private readonly ICosmosAuthorizationTokenProvider tokenProvider;
private readonly IRetryPolicyFactory retryPolicy;
private readonly ISessionContainer sessionContainer;
private readonly ClientTelemetry clientTelemetry;
private readonly TelemetryToServiceHelper telemetryToServiceHelper;

public ClientCollectionCache(
ISessionContainer sessionContainer,
IStoreModel storeModel,
ICosmosAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy,
ClientTelemetry clientTelemetry)
TelemetryToServiceHelper telemetryToServiceHelper)
{
this.storeModel = storeModel ?? throw new ArgumentNullException("storeModel");
this.tokenProvider = tokenProvider;
this.retryPolicy = retryPolicy;
this.sessionContainer = sessionContainer;
this.clientTelemetry = clientTelemetry;
this.telemetryToServiceHelper = telemetryToServiceHelper;
}

protected override Task<ContainerProperties> GetByRidAsync(string apiVersion,
Expand Down Expand Up @@ -214,21 +215,19 @@ private async Task<ContainerProperties> ReadCollectionAsync(
await this.storeModel.ProcessMessageAsync(request))
{
ContainerProperties containerProperties = CosmosResource.FromStream<ContainerProperties>(response);

if (this.clientTelemetry != null)
{
ClientCollectionCache.GetDatabaseAndCollectionName(collectionLink, out string databaseName, out string collectionName);
this.clientTelemetry.CollectCacheInfo(
cacheRefreshSource: ClientCollectionCache.TelemetrySourceName,
regionsContactedList: response.RequestStats.RegionsContacted,
requestLatency: response.RequestStats.RequestLatency,
statusCode: response.StatusCode,
containerId: collectionName,
operationType: request.OperationType,
resourceType: request.ResourceType,
subStatusCode: response.SubStatusCode,
databaseId: databaseName);
}

this.telemetryToServiceHelper.GetCollector().CollectCacheInfo(
ClientCollectionCache.TelemetrySourceName,
() => new TelemetryInformation
{
RegionsContactedList = response.RequestStats.RegionsContacted,
RequestLatency = response.RequestStats.RequestLatency,
StatusCode = response.StatusCode,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
SubStatusCode = response.SubStatusCode,
CollectionLink = collectionLink
});

return containerProperties;
}
Expand All @@ -242,12 +241,5 @@ await this.storeModel.ProcessMessageAsync(request))
}
}
}

private static void GetDatabaseAndCollectionName(string path, out string databaseName, out string collectionName)
{
string[] segments = path.Split(new char[] { '/' }, StringSplitOptions.RemoveEmptyEntries);

PathsHelper.ParseDatabaseNameAndCollectionNameFromUrlSegments(segments, out databaseName, out collectionName);
}
}
}
Loading