diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 00fb2228c1..6448d9ea9f 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -1064,7 +1064,9 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeCli (Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel, this.eventSource, this.serializerSettings, - this.httpClient); + this.httpClient, + this.PartitionKeyRangeLocation, + isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker); this.GatewayStoreModel = gatewayStoreModel; diff --git a/Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs b/Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs index 6bbce66092..44603e8a43 100644 --- a/Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs +++ b/Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs @@ -22,6 +22,7 @@ namespace Microsoft.Azure.Cosmos internal class GatewayStoreClient : TransportClient { + private readonly bool isPartitionLevelFailoverEnabled; private readonly ICommunicationEventSource eventSource; protected readonly CosmosHttpClient httpClient; protected readonly JsonSerializerSettings SerializerSettings; @@ -31,11 +32,13 @@ internal class GatewayStoreClient : TransportClient public GatewayStoreClient( CosmosHttpClient httpClient, ICommunicationEventSource eventSource, - JsonSerializerSettings serializerSettings = null) + JsonSerializerSettings serializerSettings = null, + bool isPartitionLevelFailoverEnabled = false) { this.httpClient = httpClient; this.SerializerSettings = serializerSettings; this.eventSource = eventSource; + this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled; } public async Task InvokeAsync( @@ -375,7 +378,9 @@ private Task InvokeClientAsync( return this.httpClient.SendHttpAsync( () => this.PrepareRequestMessageAsync(request, physicalAddress), resourceType, - HttpTimeoutPolicy.GetTimeoutPolicy(request), + HttpTimeoutPolicy.GetTimeoutPolicy( + request, + this.isPartitionLevelFailoverEnabled), request.RequestContext.ClientRequestStatistics, cancellationToken, request); diff --git a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs index 258aa60902..6a0f33131f 100644 --- a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs +++ b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs @@ -24,7 +24,9 @@ namespace Microsoft.Azure.Cosmos // Marking it as non-sealed in order to unit test it using Moq framework internal class GatewayStoreModel : IStoreModelExtension, IDisposable { + private readonly bool isPartitionLevelFailoverEnabled; private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString(); + private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager; internal readonly GlobalEndpointManager endpointManager; private readonly DocumentClientEventSource eventSource; @@ -43,17 +45,24 @@ public GatewayStoreModel( ConsistencyLevel defaultConsistencyLevel, DocumentClientEventSource eventSource, JsonSerializerSettings serializerSettings, - CosmosHttpClient httpClient) + CosmosHttpClient httpClient, + GlobalPartitionEndpointManager globalPartitionEndpointManager, + bool isPartitionLevelFailoverEnabled = false) { + this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled; this.endpointManager = endpointManager; this.sessionContainer = sessionContainer; this.defaultConsistencyLevel = defaultConsistencyLevel; this.eventSource = eventSource; - + this.globalPartitionEndpointManager = globalPartitionEndpointManager; this.gatewayStoreClient = new GatewayStoreClient( httpClient, this.eventSource, - serializerSettings); + serializerSettings, + isPartitionLevelFailoverEnabled); + + this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask( + this.MarkEndpointsToHealthyAsync); } public virtual async Task ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default) @@ -69,12 +78,29 @@ await GatewayStoreModel.ApplySessionTokenAsync( DocumentServiceResponse response; try { - Uri physicalAddress = GatewayStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request); // Collect region name only for document resources if (request.ResourceType.Equals(ResourceType.Document) && this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName)) { request.RequestContext.RegionName = regionName; } + + // This is applicable for both per partition automatic failover and per partition circuit breaker. + if (this.isPartitionLevelFailoverEnabled + && !ReplicatedResourceClient.IsMasterResource(request.ResourceType) + && request.ResourceType.IsPartitioned()) + { + (bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync( + request: request, + sessionContainer: this.sessionContainer, + partitionKeyRangeCache: this.partitionKeyRangeCache, + clientCollectionCache: this.clientCollectionCache, + refreshCache: false); + + request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange; + this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request); + } + + Uri physicalAddress = GatewayStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request); response = await this.gatewayStoreClient.InvokeAsync(request, request.ResourceType, physicalAddress, cancellationToken); } catch (DocumentClientException exception) @@ -422,6 +448,30 @@ private static async Task> TryResolvePartitionKey return new Tuple(true, partitonKeyRange); } + /// + /// Attempts to mark the unhealthy endpoints for a faulty partition to healthy state, un-deterministically. This is done + /// specifically for the gateway mode to get the faulty partition failed back to the original location. + /// + /// A dictionary mapping partition key ranges to their corresponding collection resource ID, original failed location, and health status. + public Task MarkEndpointsToHealthyAsync( + Dictionary> pkRangeUriMappings) + { + foreach (PartitionKeyRange pkRange in pkRangeUriMappings?.Keys) + { + string collectionRid = pkRangeUriMappings[pkRange].Item1; + Uri originalFailedLocation = pkRangeUriMappings[pkRange].Item2; + + DefaultTrace.TraceVerbose("Un-deterministically marking the original failed endpoint: {0}, for the PkRange: {1}, collectionRid: {2} back to healthy.", + originalFailedLocation, + pkRange.Id, + collectionRid); + + pkRangeUriMappings[pkRange] = new Tuple(collectionRid, originalFailedLocation, TransportAddressHealthState.HealthStatus.Connected); + } + + return Task.CompletedTask; + } + // DEVNOTE: This can be replace with ReplicatedResourceClient.IsMasterOperation on next Direct sync internal static bool IsMasterOperation( ResourceType resourceType, diff --git a/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs b/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs index 909cd88476..e514508c32 100644 --- a/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs @@ -20,7 +20,8 @@ internal abstract class HttpTimeoutPolicy public virtual bool ShouldThrow503OnTimeout => false; public static HttpTimeoutPolicy GetTimeoutPolicy( - DocumentServiceRequest documentServiceRequest) + DocumentServiceRequest documentServiceRequest, + bool isPartitionLevelFailoverEnabled = false) { //Query Plan Requests if (documentServiceRequest.ResourceType == ResourceType.Document @@ -45,7 +46,9 @@ public static HttpTimeoutPolicy GetTimeoutPolicy( //Data Plane Read if (!HttpTimeoutPolicy.IsMetaData(documentServiceRequest) && documentServiceRequest.IsReadOnlyRequest) { - return HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout; + return isPartitionLevelFailoverEnabled + ? HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeout + : HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout; } //Meta Data Read diff --git a/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicyForPartitionFailover.cs b/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicyForPartitionFailover.cs new file mode 100644 index 0000000000..3e623a5aed --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicyForPartitionFailover.cs @@ -0,0 +1,52 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Generic; + using System.Net.Http; + + internal sealed class HttpTimeoutPolicyForPartitionFailover : HttpTimeoutPolicy + { + public static readonly HttpTimeoutPolicy Instance = new HttpTimeoutPolicyForPartitionFailover(false); + public static readonly HttpTimeoutPolicy InstanceShouldThrow503OnTimeout = new HttpTimeoutPolicyForPartitionFailover(true); + public bool shouldThrow503OnTimeout; + private static readonly string Name = nameof(HttpTimeoutPolicyDefault); + + private HttpTimeoutPolicyForPartitionFailover(bool shouldThrow503OnTimeout) + { + this.shouldThrow503OnTimeout = shouldThrow503OnTimeout; + } + + private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelays = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>() + { + (TimeSpan.FromSeconds(.5), TimeSpan.Zero), + (TimeSpan.FromSeconds(.5), TimeSpan.Zero), + (TimeSpan.FromSeconds(1), TimeSpan.Zero), + }; + + public override string TimeoutPolicyName => HttpTimeoutPolicyForPartitionFailover.Name; + + public override int TotalRetryCount => this.TimeoutsAndDelays.Count; + + public override IEnumerator<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> GetTimeoutEnumerator() + { + return this.TimeoutsAndDelays.GetEnumerator(); + } + + // Assume that it is not safe to retry unless it is a get method. + // Create and other operations could have succeeded even though a timeout occurred. + public override bool IsSafeToRetry(HttpMethod httpMethod) + { + return httpMethod == HttpMethod.Get; + } + + public override bool ShouldRetryBasedOnResponse(HttpMethod requestHttpMethod, HttpResponseMessage responseMessage) + { + return false; + } + + public override bool ShouldThrow503OnTimeout => this.shouldThrow503OnTimeout; + } +} diff --git a/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs b/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs index b8ae6d4c3e..fc8a3ef4dc 100644 --- a/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs +++ b/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs @@ -23,6 +23,7 @@ internal class ThinClientStoreModel : GatewayStoreModel public ThinClientStoreModel( GlobalEndpointManager endpointManager, + GlobalPartitionEndpointManager globalPartitionEndpointManager, ISessionContainer sessionContainer, ConsistencyLevel defaultConsistencyLevel, DocumentClientEventSource eventSource, @@ -33,7 +34,8 @@ public ThinClientStoreModel( defaultConsistencyLevel, eventSource, serializerSettings, - httpClient) + httpClient, + globalPartitionEndpointManager) { this.thinClientStoreClient = new ThinClientStoreClient( httpClient, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs index 7a0f49fbb6..5ac6cdc06b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using Microsoft.Azure.Cosmos.FaultInjection; using Microsoft.VisualStudio.TestTools.UnitTesting; + using static Microsoft.Azure.Cosmos.Routing.GlobalPartitionEndpointManagerCore; using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.MultiRegionSetupHelpers; [TestClass] @@ -24,6 +25,7 @@ public class CosmosItemIntegrationTests private static string region1; private static string region2; private static string region3; + private IDictionary readRegionsMapping; private CosmosSystemTextJsonSerializer cosmosSystemTextJsonSerializer; [TestInitialize] @@ -50,12 +52,12 @@ public async Task TestInitAsync() (this.database, this.container, this.changeFeedContainer) = await MultiRegionSetupHelpers.GetOrCreateMultiRegionDatabaseAndContainers(this.client); - IDictionary readRegions = this.client.DocumentClient.GlobalEndpointManager.GetAvailableReadEndpointsByLocation(); - Assert.IsTrue(readRegions.Count() >= 3); + this.readRegionsMapping = this.client.DocumentClient.GlobalEndpointManager.GetAvailableReadEndpointsByLocation(); + Assert.IsTrue(this.readRegionsMapping.Count() >= 3); - region1 = readRegions.Keys.ElementAt(0); - region2 = readRegions.Keys.ElementAt(1); - region3 = readRegions.Keys.ElementAt(2); + region1 = this.readRegionsMapping.Keys.ElementAt(0); + region2 = this.readRegionsMapping.Keys.ElementAt(1); + region3 = this.readRegionsMapping.Keys.ElementAt(2); } [TestCleanup] @@ -295,12 +297,16 @@ await this.container.DeleteItemAsync( [TestMethod] [TestCategory("MultiRegion")] - [DataRow("15", "10", DisplayName = "Scenario whtn the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")] - [DataRow("25", "20", DisplayName = "Scenario whtn the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")] - [DataRow("35", "30", DisplayName = "Scenario whtn the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")] + [DataRow(ConnectionMode.Direct, "15", "10", DisplayName = "Direct Mode - Scenario when the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")] + [DataRow(ConnectionMode.Direct, "25", "20", DisplayName = "Direct Mode - Scenario when the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")] + [DataRow(ConnectionMode.Direct, "35", "30", DisplayName = "Direct Mode - Scenario when the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")] + [DataRow(ConnectionMode.Gateway, "15", "10", DisplayName = "Gateway Mode - Scenario when the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")] + [DataRow(ConnectionMode.Gateway, "25", "20", DisplayName = "Gateway Mode - Scenario when the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")] + [DataRow(ConnectionMode.Gateway, "35", "30", DisplayName = "Gateway Mode - Scenario when the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")] [Owner("dkunda")] [Timeout(70000)] public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountAndServiceUnavailableReceived_ShouldApplyPartitionLevelOverride( + ConnectionMode connectionMode, string iterationCount, string circuitBreakerConsecutiveFailureCount) { @@ -329,6 +335,7 @@ public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountA List preferredRegions = new List { region1, region2, region3 }; CosmosClientOptions cosmosClientOptions = new CosmosClientOptions() { + ConnectionMode = connectionMode, ConsistencyLevel = ConsistencyLevel.Session, FaultInjector = faultInjector, RequestTimeout = TimeSpan.FromSeconds(5), @@ -372,15 +379,36 @@ public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountA Assert.IsNotNull(contactedRegions); + PartitionKeyRangeFailoverInfo failoverInfo = TestCommon.GetFailoverInfoForFirstPartitionUsingReflection( + globalPartitionEndpointManager: cosmosClient.ClientContext.DocumentClient.PartitionKeyRangeLocation, + isReadOnlyOrMultiMaster: true); + if (attemptCount > consecutiveFailureCount + 1) { - Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the consecutive failure count reaches the threshold, the partition was failed over to the next region, and the subsequent read request/s were successful on the next region."); - Assert.IsTrue(contactedRegions.Contains(region2)); + if (connectionMode == ConnectionMode.Direct) + { + Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the consecutive failure count reaches the threshold, the partition was failed over to the next region, and the subsequent read request/s were successful on the next region."); + Assert.IsTrue(contactedRegions.Contains(region2)); + } + + Assert.AreEqual(this.readRegionsMapping[region2], failoverInfo.Current); } else { - Assert.IsTrue(contactedRegions.Count == 2, "Asserting that when the read request succeeds before the consecutive failure count reaches the threshold, the partition didn't over to the next region, and the request was retried on the next region."); - Assert.IsTrue(contactedRegions.Contains(region1) && contactedRegions.Contains(region2)); + if (connectionMode == ConnectionMode.Direct) + { + Assert.IsTrue(contactedRegions.Count == 2, "Asserting that when the read request succeeds before the consecutive failure count reaches the threshold, the partition didn't over to the next region, and the request was retried on the next region."); + Assert.IsTrue(contactedRegions.Contains(region1) && contactedRegions.Contains(region2)); + } + + if (attemptCount > consecutiveFailureCount) + { + Assert.AreEqual(this.readRegionsMapping[region2], failoverInfo.Current); + } + else + { + Assert.AreEqual(this.readRegionsMapping[region1], failoverInfo.Current); + } } } catch (CosmosException) @@ -740,11 +768,15 @@ public async Task CreateItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccoun [TestMethod] [Owner("dkunda")] [TestCategory("MultiMaster")] - [DataRow("15", "10", DisplayName = "Scenario whtn the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")] - [DataRow("25", "20", DisplayName = "Scenario whtn the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")] - [DataRow("35", "30", DisplayName = "Scenario whtn the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")] + [DataRow(ConnectionMode.Direct, "15", "10", DisplayName = "Direct Mode - Scenario whtn the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")] + [DataRow(ConnectionMode.Direct, "25", "20", DisplayName = "Direct Mode - Scenario whtn the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")] + [DataRow(ConnectionMode.Direct, "35", "30", DisplayName = "Direct Mode - Scenario whtn the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")] + [DataRow(ConnectionMode.Gateway, "15", "10", DisplayName = "Gateway Mode - Scenario when the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")] + [DataRow(ConnectionMode.Gateway, "25", "20", DisplayName = "Gateway Mode - Scenario when the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")] + [DataRow(ConnectionMode.Gateway, "35", "30", DisplayName = "Gateway Mode - Scenario when the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")] [Timeout(70000)] public async Task CreateItemAsync_WithCircuitBreakerEnabledAndMultiMasterAccountAndServiceUnavailableReceived_ShouldApplyPartitionLevelOverride( + ConnectionMode connectionMode, string iterationCount, string circuitBreakerConsecutiveFailureCount) { @@ -779,7 +811,8 @@ public async Task CreateItemAsync_WithCircuitBreakerEnabledAndMultiMasterAccount FaultInjector = faultInjector, RequestTimeout = TimeSpan.FromSeconds(5), ApplicationPreferredRegions = preferredRegions, - Serializer = this.cosmosSystemTextJsonSerializer + Serializer = this.cosmosSystemTextJsonSerializer, + ConnectionMode = connectionMode, }; try @@ -809,19 +842,39 @@ public async Task CreateItemAsync_WithCircuitBreakerEnabledAndMultiMasterAccount expected: HttpStatusCode.Created, actual: createResponse.StatusCode); + PartitionKeyRangeFailoverInfo failoverInfo = TestCommon.GetFailoverInfoForFirstPartitionUsingReflection( + globalPartitionEndpointManager: cosmosClient.ClientContext.DocumentClient.PartitionKeyRangeLocation, + isReadOnlyOrMultiMaster: true); + IReadOnlyList<(string regionName, Uri uri)> contactedRegionMapping = createResponse.Diagnostics.GetContactedRegions(); HashSet contactedRegions = new(contactedRegionMapping.Select(r => r.regionName)); Assert.IsNotNull(contactedRegions); if (attemptCount > consecutiveFailureCount + 1) { - Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the consecutive failure count reaches the threshold, the partition was failed over to the next region, and the subsequent write request/s were successful on the next region."); - Assert.IsTrue(contactedRegions.Contains(region2)); + if (connectionMode == ConnectionMode.Direct) + { + Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the consecutive failure count reaches the threshold, the partition was failed over to the next region, and the subsequent write request/s were successful on the next region."); + Assert.IsTrue(contactedRegions.Contains(region2)); + } + + Assert.AreEqual(this.readRegionsMapping[region2], failoverInfo.Current); } else { - Assert.IsTrue(contactedRegions.Count == 2, "Asserting that when the write requests succeeds before the consecutive failure count reaches the threshold, the partition didn't over to the next region, and the request was retried on the next region."); - Assert.IsTrue(contactedRegions.Contains(region1) && contactedRegions.Contains(region2)); + if (connectionMode == ConnectionMode.Direct) + { + Assert.IsTrue(contactedRegions.Count == 2, "Asserting that when the write requests succeeds before the consecutive failure count reaches the threshold, the partition didn't over to the next region, and the request was retried on the next region."); + Assert.IsTrue(contactedRegions.Contains(region1) && contactedRegions.Contains(region2)); + } + if (attemptCount > consecutiveFailureCount) + { + Assert.AreEqual(this.readRegionsMapping[region2], failoverInfo.Current); + } + else + { + Assert.AreEqual(this.readRegionsMapping[region1], failoverInfo.Current); + } } } catch (CosmosException) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs index 050e6ed4a7..e66f723121 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests { using System; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Globalization; @@ -12,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Linq; using System.Net; using System.Net.Http; + using System.Reflection; using System.Security.Cryptography; using System.Text; using System.Threading; @@ -28,6 +30,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using Microsoft.Azure.Documents.Routing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Newtonsoft.Json; + using static Microsoft.Azure.Cosmos.Routing.GlobalPartitionEndpointManagerCore; internal static class TestCommon { @@ -1577,6 +1580,36 @@ public static ISessionToken CreateSessionToken(ISessionToken from, long globalLS } } + public static PartitionKeyRangeFailoverInfo GetFailoverInfoForFirstPartitionUsingReflection( + GlobalPartitionEndpointManager globalPartitionEndpointManager, + bool isReadOnlyOrMultiMaster) + { + string fieldName = isReadOnlyOrMultiMaster + ? "PartitionKeyRangeToLocationForReadAndWrite" + : "PartitionKeyRangeToLocationForWrite"; + + if (globalPartitionEndpointManager is GlobalPartitionEndpointManagerCore globalPartitionEndpointManagerCore) + { + FieldInfo fieldInfo = globalPartitionEndpointManagerCore + .GetType() + .GetField( + name: fieldName, + bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic) + ?? throw new InvalidOperationException($"Could not find '{fieldName}' field on GlobalPartitionEndpointManagerCore."); + + Lazy> pkRangeMappings = (Lazy>)fieldInfo + .GetValue( + obj: globalPartitionEndpointManagerCore); + + if (pkRangeMappings.IsValueCreated) + { + return pkRangeMappings.Value?.First().Value; + } + } + + return null; + } + private class DisposableList : IDisposable { private readonly List disposableList; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs index 446b885f0f..56640a2789 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs @@ -64,7 +64,10 @@ async Task sendFunc(HttpRequestMessage request) null, MockCosmosUtil.CreateCosmosHttpClient( () => new HttpClient(messageHandler), - eventSource)); + eventSource), + GlobalPartitionEndpointManagerNoOp.Instance); + + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); using (new ActivityScope(Guid.NewGuid())) { @@ -210,13 +213,19 @@ private static GatewayStoreModel MockGatewayStoreModel(Func gep.ResolveServiceEndpoint(It.IsAny())).Returns(new Uri("http://localhost")); ISessionContainer sessionContainer = new SessionContainer(string.Empty); HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - return new GatewayStoreModel( + + GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager.Object, sessionContainer, Cosmos.ConsistencyLevel.Eventual, new DocumentClientEventSource(), new JsonSerializerSettings(), - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager.Object); + + return storeModel; } // Creates a StoreModel that will return addresses for normal requests and throw for address refresh diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs index 4db6fe3130..b9539ef8c4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs @@ -320,7 +320,8 @@ private static GatewayStoreModel MockGatewayStoreModel(Func new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); } private static Mock GetMockAddressCache(AddressInformation[] addressInformation) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index c1dff0e79e..54b49e4f82 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -1299,7 +1299,7 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabledAndUnhealthyUr // A delay of 2 minute was added to make the replica unhealthy for more than one minute. This // will make sure the unhealthy replica gets a chance to re-validate it's health status. - ReflectionUtils.AddMinuteToDateTimeFieldUsingReflection( + TestUtils.AddMinuteToDateTimeFieldUsingReflection( objectName: refreshedUri.GetCurrentHealthState(), fieldName: "lastUnhealthyTimestamp", delayInMinutes: -2); @@ -1450,7 +1450,7 @@ await cache.MarkAddressesToUnhealthyAsync( .ReplicaTransportAddressUris .Single(x => x.ToString().Equals(addressTobeMarkedUnhealthy)); - ReflectionUtils.AddMinuteToDateTimeFieldUsingReflection( + TestUtils.AddMinuteToDateTimeFieldUsingReflection( objectName: refreshedUri.GetCurrentHealthState(), fieldName: "lastUnhealthyTimestamp", delayInMinutes: -2 * iterationIndex); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs index 61bca004e3..f5e9a95cea 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs @@ -146,7 +146,10 @@ public async Task TestRetries() ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); using (new ActivityScope(Guid.NewGuid())) { @@ -205,7 +208,10 @@ public async Task PassesPropertiesFromDocumentServiceRequest() ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); using (new ActivityScope(Guid.NewGuid())) { @@ -553,7 +559,10 @@ public async Task TestErrorResponsesProvideBody() ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); using (new ActivityScope(Guid.NewGuid())) { @@ -612,7 +621,10 @@ private async Task GatewayStoreModel_Exception_UpdateSessionTokenOnKnownExceptio ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); INameValueCollection headers = new RequestNameValueCollection(); headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString()); @@ -673,7 +685,8 @@ private async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownExcep ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); INameValueCollection headers = new RequestNameValueCollection(); headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString()); @@ -821,8 +834,10 @@ private async Task GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResp ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); INameValueCollection headers = new RequestNameValueCollection(); headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString()); headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken); @@ -928,8 +943,10 @@ private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownR ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); + TestUtils.SetupCachesInGatewayStoreModel(storeModel, endpointManager); INameValueCollection headers = new RequestNameValueCollection(); headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString()); headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken); @@ -967,7 +984,8 @@ public async Task GatewayStoreModel_AvoidGlobalSessionToken() ConsistencyLevel.Session, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient())); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient()), + GlobalPartitionEndpointManagerNoOp.Instance); Mock clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null, false); Mock partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache.Object, endpointManager, false); @@ -1061,7 +1079,8 @@ Task sendFunc(HttpRequestMessage request) ConsistencyLevel.Session, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); Mock clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null, false); @@ -1267,7 +1286,8 @@ static async Task messageHandler(HttpRequestMessage request ConsistencyLevel.Eventual, eventSource, null, - MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(httpMessageHandler))); + MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(httpMessageHandler)), + GlobalPartitionEndpointManagerNoOp.Instance); ClientCollectionCache clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null, false).Object; PartitionKeyRangeCache partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache, endpointManager, false).Object; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientStoreModelTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientStoreModelTests.cs index 8ccbc5a94b..fcbb7baa2d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientStoreModelTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientStoreModelTests.cs @@ -48,6 +48,7 @@ public void TestInitialize() this.thinClientStoreModel = new ThinClientStoreModel( endpointManager: this.endpointManager, + globalPartitionEndpointManager: GlobalPartitionEndpointManagerNoOp.Instance, sessionContainer: this.sessionContainer, defaultConsistencyLevel: (Cosmos.ConsistencyLevel)this.defaultConsistencyLevel, eventSource: new DocumentClientEventSource(), @@ -118,6 +119,7 @@ public async Task ProcessMessageAsync_Success_ShouldReturnDocumentServiceRespons ThinClientStoreModel storeModel = new ThinClientStoreModel( endpointManager: multiEndpointMgr, + globalPartitionEndpointManager: GlobalPartitionEndpointManagerNoOp.Instance, sessionContainer: this.sessionContainer, defaultConsistencyLevel: (Cosmos.ConsistencyLevel)this.defaultConsistencyLevel, eventSource: new DocumentClientEventSource(), @@ -213,6 +215,7 @@ public async Task ProcessMessageAsync_404_ShouldThrowDocumentClientException() ThinClientStoreModel storeModel = new ThinClientStoreModel( endpointManager: endpointManagerOk, + globalPartitionEndpointManager: GlobalPartitionEndpointManagerNoOp.Instance, sessionContainer: this.sessionContainer, defaultConsistencyLevel: (Cosmos.ConsistencyLevel)this.defaultConsistencyLevel, eventSource: new DocumentClientEventSource(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/ReflectionUtils.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/ReflectionUtils.cs deleted file mode 100644 index 4641befaf2..0000000000 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/ReflectionUtils.cs +++ /dev/null @@ -1,43 +0,0 @@ -//------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -//------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Tests -{ - using System; - using System.Reflection; - - /// - /// Common utility class for reflaction related operations. - /// - internal static class ReflectionUtils - { - /// - /// This helper method uses reflection to set the private and read only fields - /// to the disered values to help the test cases mimic the expected behavior. - /// - /// An object where reflection will be applied to update the field. - /// A string containing the internal field name. - /// An integer to add or substract the desired delay in minutes. - internal static void AddMinuteToDateTimeFieldUsingReflection( - object objectName, - string fieldName, - int delayInMinutes) - { - FieldInfo fieldInfo = objectName - .GetType() - .GetField( - name: fieldName, - bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); - - DateTime? fieldValue = (DateTime?)fieldInfo - .GetValue( - obj: objectName); - - fieldInfo - .SetValue( - obj: objectName, - value: ((DateTime)fieldValue).AddMinutes(delayInMinutes)); - } - } -} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/TestUtils.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/TestUtils.cs new file mode 100644 index 0000000000..ffdd8b98e7 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/TestUtils.cs @@ -0,0 +1,82 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Collections.ObjectModel; + using System.Reflection; + using Microsoft.Azure.Cosmos.Common; + using Microsoft.Azure.Cosmos.Routing; + using Moq; + using System.Threading.Tasks; + using System.Threading; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + + /// + /// Common utility class for unit tests. + /// + internal static class TestUtils + { + /// + /// This helper method uses reflection to set the private and read only fields + /// to the disered values to help the test cases mimic the expected behavior. + /// + /// An object where reflection will be applied to update the field. + /// A string containing the internal field name. + /// An integer to add or substract the desired delay in minutes. + internal static void AddMinuteToDateTimeFieldUsingReflection( + object objectName, + string fieldName, + int delayInMinutes) + { + FieldInfo fieldInfo = objectName + .GetType() + .GetField( + name: fieldName, + bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); + + DateTime? fieldValue = (DateTime?)fieldInfo + .GetValue( + obj: objectName); + + fieldInfo + .SetValue( + obj: objectName, + value: ((DateTime)fieldValue).AddMinutes(delayInMinutes)); + } + + public static void SetupCachesInGatewayStoreModel( + GatewayStoreModel storeModel, + GlobalEndpointManager endpointManager) + { + PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition() + { + Kind = PartitionKind.Hash, + Paths = new Collection() + { + "/id" + } + }; + + // Prepare mocked caches. + Mock clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null, false); + Mock partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache.Object, endpointManager, false); + + ContainerProperties containerProperties = ContainerProperties.CreateWithResourceId("test"); + containerProperties.PartitionKey = partitionKeyDefinition; + clientCollectionCache.Setup + (m => + m.ResolveCollectionAsync( + It.IsAny(), + It.IsAny(), + It.IsAny() + ) + ).Returns(Task.FromResult(containerProperties)); + + storeModel.SetCaches(partitionKeyRangeCache.Object, clientCollectionCache.Object); + } + } +} \ No newline at end of file