Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Resiliency: Refactors Implementation for Opening Rntbd Connections to Backend Replicas in Direct Mode. #3640

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ClientOfficialVersion>3.31.2</ClientOfficialVersion>
<ClientPreviewVersion>3.31.2</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.29.4</DirectVersion>
<DirectVersion>3.30.0</DirectVersion>
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
<EncryptionOfficialVersion>2.0.0</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.0</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
46 changes: 12 additions & 34 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
private bool disposedValue;
private IOpenConnectionsHandler openConnectionsHandler;

public GatewayAddressCache(
Uri serviceEndpoint,
Protocol protocol,
ICosmosAuthorizationTokenProvider tokenProvider,
IServiceConfigurationReader serviceConfigReader,
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false)
{
Expand All @@ -80,6 +82,8 @@ public GatewayAddressCache(
GatewayAddressCache.protocolFilterFormat,
Constants.Properties.Protocol,
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand All @@ -88,7 +92,7 @@ public async Task OpenConnectionsAsync(
string databaseName,
ContainerProperties collection,
IReadOnlyList<PartitionKeyRangeIdentity> partitionKeyRangeIdentities,
Func<Uri, Task> openConnectionHandler,
bool shouldOpenRntbdChannels,
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken)
{
List<Task<TryCatch<DocumentServiceResponse>>> tasks = new ();
Expand Down Expand Up @@ -157,47 +161,21 @@ public async Task OpenConnectionsAsync(
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

if (openConnectionHandler != null)
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
await this.OpenRntbdChannelsAsync(
addressInfo,
openConnectionHandler);
await this.openConnectionsHandler
.TryOpenRntbdChannelsAsync(
addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris);
}
}
}
}
}

/// <summary>
/// Invokes the transport client delegate to open the Rntbd connection
/// and establish Rntbd context negotiation to the backend replica nodes.
/// </summary>
/// <param name="addressInfo">An instance of <see cref="Tuple{T1, T2}"/> containing the partition key id
/// and it's corresponding address information.</param>
/// <param name="openConnectionHandlerAsync">The transport client callback delegate to be invoked at a
/// later point of time.</param>
private async Task OpenRntbdChannelsAsync(
Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo,
Func<Uri, Task> openConnectionHandlerAsync)
/// <inheritdoc/>
public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler)
{
foreach (AddressInformation address in addressInfo.Item2.AllAddresses)
{
DefaultTrace.TraceVerbose("Attempting to open Rntbd connection to backend uri: {0}. '{1}'",
address.PhysicalUri,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
try
{
await openConnectionHandlerAsync(
new Uri(address.PhysicalUri));
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to open Rntbd connection to backend uri: {0} with exception: {1}. '{2}'",
address.PhysicalUri,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}
this.openConnectionsHandler = openConnectionsHandler;
}

public async Task<PartitionAddressInformation> TryGetAddressesAsync(
Expand Down
24 changes: 19 additions & 5 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly CosmosHttpClient httpClient;
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
GlobalEndpointManager endpointManager,
Expand Down Expand Up @@ -108,24 +109,22 @@ public async Task OpenAsync(
databaseName: databaseName,
collection: collection,
partitionKeyRangeIdentities: ranges,
openConnectionHandler: null,
shouldOpenRntbdChannels: false,
cancellationToken: cancellationToken));
}

await Task.WhenAll(tasks);
}

/// <summary>
/// Invokes the gateway address cache and passes the <see cref="Documents.Rntbd.TransportClient"/> deligate to be invoked from the same.
/// Invokes the gateway address cache to open the rntbd connections to the backend replicas.
/// </summary>
/// <param name="databaseName">A string containing the name of the database.</param>
/// <param name="containerLinkUri">A string containing the container's link uri.</param>
/// <param name="openConnectionHandlerAsync">The transport client callback delegate to be invoked at a later point of time.</param>
/// <param name="cancellationToken">An Instance of the <see cref="CancellationToken"/>.</param>
public async Task OpenConnectionsToAllReplicasAsync(
string databaseName,
string containerLinkUri,
Func<Uri, Task> openConnectionHandlerAsync,
CancellationToken cancellationToken = default)
{
try
Expand Down Expand Up @@ -180,7 +179,7 @@ await this.addressCacheByEndpoint[firstPreferredReadRegion]
databaseName: databaseName,
collection: collection,
partitionKeyRangeIdentities: partitionKeyRangeIdentities,
openConnectionHandler: openConnectionHandlerAsync,
shouldOpenRntbdChannels: true,
cancellationToken: cancellationToken);

}
Expand All @@ -197,6 +196,20 @@ await this.addressCacheByEndpoint[firstPreferredReadRegion]
}
}

/// <inheritdoc/>
public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler)
{
this.openConnectionsHandler = openConnectionsHandler;

// Sets the openConnectionsHandler for the existing address cache.
// For the new address caches added later, the openConnectionsHandler
// will be set through the constructor.
foreach (EndpointCache endpointCache in this.addressCacheByEndpoint.Values)
{
endpointCache.AddressCache.SetOpenConnectionsHandler(openConnectionsHandler);
}
}

public async Task<PartitionAddressInformation> ResolveAsync(
DocumentServiceRequest request,
bool forceRefresh,
Expand Down Expand Up @@ -284,6 +297,7 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.tokenProvider,
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery);

string location = this.endpointManager.GetLocation(endpoint);
Expand Down
7 changes: 7 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/IAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,12 @@ Task<PartitionAddressInformation> TryGetAddressesAsync(
ServiceIdentity serviceIdentity,
bool forceRefreshPartitionAddresses,
CancellationToken cancellationToken);

/// <summary>
/// Sets the <see cref="IOpenConnectionsHandler"/> instance to a global readonly
/// field for invoking the open connection request at a later point of time.
/// </summary>
/// <param name="openConnectionsHandler">An instance of <see cref="IOpenConnectionsHandler"/>.</param>
void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Threading;
Expand Down Expand Up @@ -102,13 +103,13 @@ public override async Task<ResponseMessage> SendAsync(RequestMessage request, Ca
private void ValidateLazyHeadersAreNotCreated(CosmosMessageHeadersInternal internalHeaders)
{
RequestNameValueCollection storeRequestHeaders = (RequestNameValueCollection)internalHeaders.INameValueCollection;
FieldInfo lazyHeaders = typeof(Documents.Collections.RequestNameValueCollection).GetField("lazyNotCommonHeaders", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
Lazy<Dictionary<string, string>> lazyNotCommonHeaders = (Lazy<Dictionary<string, string>>)lazyHeaders.GetValue(storeRequestHeaders);
FieldInfo fieldInfo = storeRequestHeaders.GetType().GetField("notCommonHeaders", BindingFlags.Instance | BindingFlags.NonPublic);
Dictionary<string, string> notCommonHeaders = (Dictionary<string, string>)fieldInfo.GetValue(storeRequestHeaders);

// Use the if instead of Assert.IsFalse to avoid creating the dictionary in the error message
if (lazyNotCommonHeaders.IsValueCreated)
if (notCommonHeaders != null && notCommonHeaders.Any())
{
Assert.Fail($"The lazy dictionary should not be created. Please add the following headers to the {nameof(Documents.Collections.RequestNameValueCollection)}: {JsonConvert.SerializeObject(lazyNotCommonHeaders.Value)}");
Assert.Fail($"The lazy dictionary should not be created. Please add the following headers to the {nameof(Documents.Collections.RequestNameValueCollection)}: {JsonConvert.SerializeObject(notCommonHeaders)}");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,33 +144,10 @@ private static void ThrowTransportExceptionOnItemOperation(
userPayload: true,
payloadSent: false);

DocumentClientException documentClientException = new DocumentClientException(
message: "Exception",
innerException: transportException,
statusCode: System.Net.HttpStatusCode.Gone);
IClientSideRequestStatistics requestStatistics = request.RequestContext.ClientRequestStatistics;
requestStatistics.RecordResponse(
request,
new StoreResult(
storeResponse: null,
exception: documentClientException,
partitionKeyRangeId: "PkRange",
lsn: 42,
quorumAckedLsn: 4242,
requestCharge: 9000.42,
currentReplicaSetSize: 3,
currentWriteQuorum: 4,
isValid: true,
storePhysicalAddress: physicalAddress,
globalCommittedLSN: 2,
numberOfReadRegions: 1,
itemLSN: 5,
sessionToken: null,
usingLocalLSN: true,
activityId: Guid.NewGuid().ToString(),
backendRequestDurationInMs: "0",
retryAfterInMs: "42",
transportRequestStats: new TransportRequestStats()),
StoreResult.CreateForTesting(transportRequestStats: new TransportRequestStats()).Target,
DateTime.MinValue,
DateTime.MaxValue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,26 +214,7 @@
StoreResponseStatistics storeResponseStatistics = new StoreResponseStatistics(
DateTime.MinValue,
DateTime.MaxValue,
new Documents.StoreResult(
storeResponse: new StoreResponse(),
exception: null,
partitionKeyRangeId: 42.ToString(),
lsn: 1337,
quorumAckedLsn: 23,
requestCharge: 3.14,
currentReplicaSetSize: 4,
currentWriteQuorum: 3,
isValid: true,
storePhysicalAddress: new Uri("http://storephysicaladdress.com"),
globalCommittedLSN: 1234,
numberOfReadRegions: 13,
itemLSN: 15,
sessionToken: new SimpleSessionToken(42),
usingLocalLSN: true,
activityId: Guid.Empty.ToString(),
backendRequestDurationInMs: "4.2",
retryAfterInMs: "42",
transportRequestStats: TraceWriterBaselineTests.CreateTransportRequestStats()),
StoreResult.CreateForTesting(transportRequestStats: TraceWriterBaselineTests.CreateTransportRequestStats()).Target,
ResourceType.Document,
OperationType.Query,
"42",
Expand Down Expand Up @@ -357,7 +338,7 @@
"IsValid": true,
"StorePhysicalAddress": "http://storephysicaladdress.com/",
"RequestCharge": 3.14,
"RetryAfterInMs": "42",
"RetryAfterInMs": "9000",
"BELatencyInMs": "4.2",
"transportRequestTimeline": {
"requestTimeline": [
Expand Down Expand Up @@ -439,26 +420,7 @@
StoreResponseStatistics storeResponseStatistics = new StoreResponseStatistics(
requestStartTime: default,
requestResponseTime: default,
new Documents.StoreResult(
storeResponse: new StoreResponse(),
exception: default,
partitionKeyRangeId: default,
lsn: default,
quorumAckedLsn: default,
requestCharge: default,
currentReplicaSetSize: default,
currentWriteQuorum: default,
isValid: default,
storePhysicalAddress: default,
globalCommittedLSN: default,
numberOfReadRegions: default,
itemLSN: default,
sessionToken: default,
usingLocalLSN: default,
activityId: default,
retryAfterInMs: default,
backendRequestDurationInMs: default,
transportRequestStats: TraceWriterBaselineTests.CreateTransportRequestStats()),
StoreResult.CreateForTesting(storeResponse: new StoreResponse()).Target,
resourceType: default,
operationType: default,
requestSessionToken: default,
Expand Down Expand Up @@ -567,55 +529,7 @@
"RequestCharge": 0,
"RetryAfterInMs": null,
"BELatencyInMs": null,
"transportRequestTimeline": {
"requestTimeline": [
{
"event": "Created",
"startTimeUtc": "2021-12-31T23:59:59.059Z",
"durationInMs": 1
},
{
"event": "ChannelAcquisitionStarted",
"startTimeUtc": "2021-12-31T23:59:59.06Z",
"durationInMs": 0
},
{
"event": "Pipelined",
"startTimeUtc": "2021-12-31T23:59:59.06Z",
"durationInMs": 0
},
{
"event": "Transit Time",
"startTimeUtc": "2021-12-31T23:59:59.06Z",
"durationInMs": 0
},
{
"event": "Received",
"startTimeUtc": "2021-12-31T23:59:59.06Z",
"durationInMs": 0
},
{
"event": "Completed",
"startTimeUtc": "2021-12-31T23:59:59.06Z",
"durationInMs": 0
}
],
"serviceEndpointStats": {
"inflightRequests": 2,
"openConnections": 1
},
"connectionStats": {
"waitforConnectionInit": "True",
"callsPendingReceive": 1,
"lastSendAttempt": "2021-12-31T23:59:59.059Z",
"lastSend": "2021-12-31T23:59:59.059Z",
"lastReceive": "2021-12-31T23:59:59.059Z"
},
"requestSizeInBytes": 2,
"requestBodySizeInBytes": 1,
"responseMetadataSizeInBytes": 1,
"responseBodySizeInBytes": 1
},
"transportRequestTimeline": null,
"TransportException": null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ public Task UpdateAsync(IReadOnlyList<AddressCacheToken> addressCacheTokens, Can
public Task OpenConnectionsToAllReplicasAsync(
string databaseName,
string containerLinkUri,
Func<Uri, Task> openConnectionHandlerAsync,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
Expand All @@ -531,6 +530,12 @@ public Task UpdateAsync(Documents.Rntbd.ServerKey serverKey, CancellationToken c
{
throw new NotImplementedException();
}

public void SetOpenConnectionsHandler(
IOpenConnectionsHandler openConnectionHandler)
{
throw new NotImplementedException();
}
}

private class MockTransportClient : TransportClient
Expand Down
Loading