Skip to content

Commit f7a4c56

Browse files
[Internal] Upgrade Resiliency: Fixes Duplicate Channel and Task Creation. (#4123)
* Code changes to fix duplicate channel and thread pool on refresh flow. * Code changes to fix failed tests. * Code changes to add global semaphore for concurrency control in address cache. * Code changes to refactor the refresh async method. * Code changes to address review comments. * Code changes to update summary.
1 parent f379919 commit f7a4c56

File tree

6 files changed

+271
-101
lines changed

6 files changed

+271
-101
lines changed

Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs

+14-10
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos
99
using System.Threading;
1010
using System.Threading.Tasks;
1111
using Microsoft.Azure.Cosmos.Core.Trace;
12-
using Microsoft.Azure.Cosmos.Tracing.TraceData;
1312

1413
/// <summary>
1514
/// This is a thread safe AsyncCache that allows refreshing values in the background.
@@ -179,21 +178,26 @@ public bool TryRemove(TKey key)
179178

180179
/// <summary>
181180
/// Refreshes the async non blocking cache on-demand for the given <paramref name="key"/>
182-
/// and caches the result for later usage.
181+
/// and caches the result for later usage. Note that this method doesn't control the number
182+
/// of tasks created in parallel, and the concurrency needed to be controlled at the caller.
183183
/// </summary>
184184
/// <param name="key">The requested key to be refreshed.</param>
185185
/// <param name="singleValueInitFunc">A func delegate to be invoked at a later point of time.</param>
186-
public async Task RefreshAsync(
186+
public void Refresh(
187187
TKey key,
188188
Func<TValue, Task<TValue>> singleValueInitFunc)
189189
{
190190
if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask<TValue> initialLazyValue))
191191
{
192-
await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
193-
key: key,
194-
initialValue: initialLazyValue,
195-
callbackDelegate: singleValueInitFunc,
196-
operationName: nameof(RefreshAsync));
192+
Task backgroundRefreshTask = this.GetAsync(
193+
key: key,
194+
singleValueInitFunc: singleValueInitFunc,
195+
forceRefresh: (_) => true);
196+
197+
Task continuationTask = backgroundRefreshTask
198+
.ContinueWith(
199+
task => DefaultTrace.TraceVerbose("Failed to refresh addresses in the background with exception: {0}", task.Exception),
200+
TaskContinuationOptions.OnlyOnFaulted);
197201
}
198202
}
199203

@@ -250,8 +254,8 @@ private sealed class AsyncLazyWithRefreshTask<T>
250254
{
251255
private readonly CancellationToken cancellationToken;
252256
private readonly Func<T, Task<T>> createValueFunc;
253-
private readonly object valueLock = new object();
254-
private readonly object removedFromCacheLock = new object();
257+
private readonly object valueLock = new ();
258+
private readonly object removedFromCacheLock = new ();
255259

256260
private bool removedFromCache = false;
257261
private Task<T> value;

Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs

+39-11
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
5050
private readonly ICosmosAuthorizationTokenProvider tokenProvider;
5151
private readonly bool enableTcpConnectionEndpointRediscovery;
5252

53+
private readonly SemaphoreSlim semaphore;
5354
private readonly CosmosHttpClient httpClient;
5455
private readonly bool isReplicaAddressValidationEnabled;
5556

5657
private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
5758
private DateTime suboptimalMasterPartitionTimestamp;
5859
private bool disposedValue;
60+
private bool validateUnknownReplicas;
5961
private IOpenConnectionsHandler openConnectionsHandler;
6062

6163
public GatewayAddressCache(
@@ -90,8 +92,10 @@ public GatewayAddressCache(
9092
Constants.Properties.Protocol,
9193
GatewayAddressCache.ProtocolString(this.protocol));
9294

95+
this.semaphore = new SemaphoreSlim(1, 1);
9396
this.openConnectionsHandler = openConnectionsHandler;
9497
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
98+
this.validateUnknownReplicas = false;
9599
}
96100

97101
public Uri ServiceEndpoint => this.serviceEndpoint;
@@ -120,6 +124,14 @@ public async Task OpenConnectionsAsync(
120124
List<Task> tasks = new ();
121125
int batchSize = GatewayAddressCache.DefaultBatchSize;
122126

127+
// By design, the Unknown replicas are validated only when the following two conditions meet:
128+
// 1) The CosmosClient is initiated using the CreateAndInitializaAsync() flow.
129+
// 2) The advanced replica selection feature enabled.
130+
if (shouldOpenRntbdChannels)
131+
{
132+
this.validateUnknownReplicas = true;
133+
}
134+
123135
#if !(NETSTANDARD15 || NETSTANDARD16)
124136
#if NETSTANDARD20
125137
// GetEntryAssembly returns null when loaded from native netstandard2.0
@@ -302,11 +314,12 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
302314
.ReplicaTransportAddressUris
303315
.Any(x => x.ShouldRefreshHealthStatus()))
304316
{
305-
Task refreshAddressesInBackgroundTask = Task.Run(async () =>
317+
bool slimAcquired = await this.semaphore.WaitAsync(0);
318+
try
306319
{
307-
try
320+
if (slimAcquired)
308321
{
309-
await this.serverPartitionAddressCache.RefreshAsync(
322+
this.serverPartitionAddressCache.Refresh(
310323
key: partitionKeyRangeIdentity,
311324
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
312325
request,
@@ -315,14 +328,21 @@ await this.serverPartitionAddressCache.RefreshAsync(
315328
partitionKeyRangeIdentity.PartitionKeyRangeId,
316329
forceRefresh: true));
317330
}
318-
catch (Exception ex)
331+
else
319332
{
320-
DefaultTrace.TraceWarning("Failed to refresh addresses in the background for the collection rid: {0} with exception: {1}. '{2}'",
333+
DefaultTrace.TraceVerbose("Failed to refresh addresses in the background for the collection rid: {0}, partition key range id: {1}, because the semaphore is already acquired. '{2}'",
321334
partitionKeyRangeIdentity.CollectionRid,
322-
ex,
335+
partitionKeyRangeIdentity.PartitionKeyRangeId,
323336
System.Diagnostics.Trace.CorrelationManager.ActivityId);
324337
}
325-
});
338+
}
339+
finally
340+
{
341+
if (slimAcquired)
342+
{
343+
this.semaphore.Release();
344+
}
345+
}
326346
}
327347

328348
return addresses;
@@ -1008,18 +1028,26 @@ private static PartitionAddressInformation MergeAddresses(
10081028
/// Returns a list of <see cref="TransportAddressUri"/> needed to validate their health status. Validating
10091029
/// a uri is done by opening Rntbd connection to the backend replica, which is a costly operation by nature. Therefore
10101030
/// vaidating both Unhealthy and Unknown replicas at the same time could impose a high CPU utilization. To avoid this
1011-
/// situation, the RntbdOpenConnectionHandler has good concurrency control mechanism to open the connections gracefully/>.
1031+
/// situation, the RntbdOpenConnectionHandler has good concurrency control mechanism to open the connections gracefully.
1032+
/// By default, this method only returns the Unhealthy replicas that requires to validate it's connectivity status. The
1033+
/// Unknown replicas are validated only when the CosmosClient is initiated using the CreateAndInitializaAsync() flow.
10121034
/// </summary>
10131035
/// <param name="transportAddresses">A read only list of <see cref="TransportAddressUri"/>s.</param>
10141036
/// <returns>A list of <see cref="TransportAddressUri"/> that needs to validate their status.</returns>
10151037
private IEnumerable<TransportAddressUri> GetAddressesNeededToValidateStatus(
10161038
IReadOnlyList<TransportAddressUri> transportAddresses)
10171039
{
1018-
return transportAddresses
1019-
.Where(address => address
1040+
return this.validateUnknownReplicas
1041+
? transportAddresses
1042+
.Where(address => address
1043+
.GetCurrentHealthState()
1044+
.GetHealthStatus() is
1045+
TransportAddressHealthState.HealthStatus.UnhealthyPending or
1046+
TransportAddressHealthState.HealthStatus.Unknown)
1047+
: transportAddresses
1048+
.Where(address => address
10201049
.GetCurrentHealthState()
10211050
.GetHealthStatus() is
1022-
TransportAddressHealthState.HealthStatus.Unknown or
10231051
TransportAddressHealthState.HealthStatus.UnhealthyPending);
10241052
}
10251053

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs

+9-5
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ public async Task TestGoneFromServiceScenarioAsync(
7171
"44444444444444444",
7272
};
7373

74-
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
75-
replicaIds1,
76-
partitionKeyRanges.First(),
77-
"eastus",
78-
cRid);
74+
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
75+
replicaIds1,
76+
partitionKeyRanges.First(),
77+
"eastus",
78+
cRid);
7979

8080
// One replica changed on the refresh
8181
List<string> replicaIds2 = new List<string>()
@@ -176,6 +176,10 @@ public async Task TestGoneFromServiceScenarioAsync(
176176
mockTransportClient.VerifyAll();
177177
mockHttpHandler.VerifyAll();
178178

179+
mockTransportClient
180+
.Setup(x => x.OpenConnectionAsync(It.IsAny<Uri>()))
181+
.Returns(Task.CompletedTask);
182+
179183
Documents.TransportAddressUri failedReplica = urisVisited.First();
180184

181185
// With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow,

0 commit comments

Comments
 (0)