diff --git a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs index db8a549387..247bac65a3 100644 --- a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs +++ b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs @@ -552,31 +552,54 @@ internal static bool IsStoredProcedureCrudOperation( operationType != Documents.OperationType.ExecuteJavaScript; } - internal static bool IsOperationSupportedByThinClient(DocumentServiceRequest request) - { - // Document operations - if (request.ResourceType == ResourceType.Document - && (request.OperationType == OperationType.Batch - || request.OperationType == OperationType.Patch - || request.OperationType == OperationType.Create - || request.OperationType == OperationType.Read - || request.OperationType == OperationType.Upsert - || request.OperationType == OperationType.Replace - || request.OperationType == OperationType.Delete - || request.OperationType == OperationType.Query - || request.OperationType == OperationType.QueryPlan)) - { - return true; - } - - // Stored Procedure execution - if (request.ResourceType == ResourceType.StoredProcedure - && request.OperationType == OperationType.ExecuteJavaScript) - { - return true; - } - - return false; + internal static bool IsOperationSupportedByThinClient(DocumentServiceRequest request) + { + // Document operations + if (request.ResourceType == ResourceType.Document + && (request.OperationType == OperationType.Batch + || request.OperationType == OperationType.Patch + || request.OperationType == OperationType.Create + || request.OperationType == OperationType.Read + || request.OperationType == OperationType.Upsert + || request.OperationType == OperationType.Replace + || request.OperationType == OperationType.Delete + || request.OperationType == OperationType.Query + || request.OperationType == OperationType.QueryPlan)) + { + return true; + } + + // LatestVersion (Incremental) ChangeFeed on documents. + // AllVersionsAndDeletes (FullFidelity) is excluded because it requires + // split-handling logic in Compute Gateway (UseGatewayMode is set by ChangeFeedModeFullFidelity). + if (request.ResourceType == ResourceType.Document + && request.OperationType == OperationType.ReadFeed + && GatewayStoreModel.IsLatestVersionChangeFeedRequest(request)) + { + return true; + } + + // Stored Procedure execution + if (request.ResourceType == ResourceType.StoredProcedure + && request.OperationType == OperationType.ExecuteJavaScript) + { + return true; + } + + return false; + } + + /// + /// Determines if the request is a LatestVersion (Incremental) change feed request that can + /// be routed to the thin client. Returns true only when the A-IM header is exactly + /// HttpConstants.A_IMHeaderValues.IncrementalFeed. Any other value — including + /// Full-Fidelity Feed (AllVersionsAndDeletes) or an unknown future mode — falls back to + /// Compute Gateway so that new modes are not accidentally routed to the thin client. + /// + internal static bool IsLatestVersionChangeFeedRequest(DocumentServiceRequest request) + { + string aImHeaderValue = request.Headers[HttpConstants.HttpHeaders.A_IM]; + return string.Equals(aImHeaderValue, HttpConstants.A_IMHeaderValues.IncrementalFeed, StringComparison.OrdinalIgnoreCase); } private async Task GetDatabaseAccountPropertiesAsync() { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemThinClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemThinClientTests.cs index d34866a6ca..fd67e937b5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemThinClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemThinClientTests.cs @@ -34,7 +34,7 @@ public class CosmosItemThinClientTests [TestInitialize] public async Task TestInitAsync() { - Environment.SetEnvironmentVariable(ConfigurationManager.ThinClientModeEnabled, "True"); + Environment.SetEnvironmentVariable(ConfigurationManager.ThinClientModeEnabled, "True"); this.connectionString = Environment.GetEnvironmentVariable("COSMOSDB_THINCLIENT"); if (string.IsNullOrEmpty(this.connectionString)) @@ -1354,6 +1354,52 @@ public async Task TestThinClientQueryPlanMultiPartitionFanout() } } + [TestMethod] + [TestCategory("ThinClient")] + public async Task TestThinClientChangeFeedLatestVersionAsync() + { + // Arrange: + string pk = "pk_changefeed"; + List items = this.GenerateItems(pk).Take(10).ToList(); + List createdItems = await this.CreateItemsSafeAsync(items); + + Assert.IsTrue(createdItems.Count > 0, "At least one item must be created for the change feed test."); + + // Act: Read change feed using LatestVersion mode + List changeFeedResults = new List(); + FeedIterator changeFeedIterator = this.container.GetChangeFeedIterator( + ChangeFeedStartFrom.Beginning(), + ChangeFeedMode.LatestVersion, + new ChangeFeedRequestOptions() + { + PageSizeHint = 10 + }); + + while (changeFeedIterator.HasMoreResults) + { + FeedResponse response = await changeFeedIterator.ReadNextAsync(); + + if (response.StatusCode == HttpStatusCode.NotModified) + { + break; + } + + string diagnostics = response.Diagnostics.ToString(); + Assert.IsTrue(diagnostics.Contains("|F4"), "Diagnostics User Agent should contain '|F4' for ThinClient change feed"); + + changeFeedResults.AddRange(response); + } + + // Assert: Verify all created items appear in the change feed + Assert.IsTrue(changeFeedResults.Count >= createdItems.Count, + $"Change feed should return at least {createdItems.Count} items but got {changeFeedResults.Count}."); + + HashSet createdIds = new HashSet(createdItems.Select(i => i.Id)); + HashSet changeFeedIds = new HashSet(changeFeedResults.Select(i => i.Id)); + Assert.IsTrue(createdIds.IsSubsetOf(changeFeedIds), + "All created items should appear in the change feed results."); + } + /// /// DelegatingHandler that intercepts HTTP requests and can inject faults /// diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs index 38ba61c481..8f175fafec 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs @@ -1332,11 +1332,13 @@ public async Task ThinClient_ProcessMessageAsync_WithUnsupportedOperations_Shoul It.IsAny())) .ReturnsAsync(successResponse); - DocumentServiceRequest request = DocumentServiceRequest.Create( - operationType: OperationType.ReadFeed, - resourceType: ResourceType.Document, - resourceId: "NH1uAJ6ANm0=", - body: null, + // Plain ReadFeed without an A-IM header is a realistic unsupported operation + // (document enumeration, not a change feed). + DocumentServiceRequest request = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); Mock docClientMulti = new Mock(); @@ -1384,15 +1386,227 @@ public async Task ThinClient_ProcessMessageAsync_WithUnsupportedOperations_Shoul // Act DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request); - - // Assert - Assert.IsNotNull(response); - Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); - } - - [TestMethod] - [Owner("aavasthy")] - public void ThinClient_Dispose_ShouldDisposeThinClientStoreClient() + + // Assert + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + [TestMethod] + [Owner("aavasthy")] + public async Task ThinClient_ProcessMessageAsync_WithLatestVersionChangeFeed_ShouldRouteToThinClient() + { + // Arrange + bool thinClientInvoked = false; + MockThinClientStoreClient thinClientStoreClient = new MockThinClientStoreClient( + (request, resourceType, uri, endpoint, globalDatabaseAccountName, clientCollectionCache, cancellationToken) => + { + thinClientInvoked = true; + INameValueCollection headers = new StoreResponseNameValueCollection(); + return Task.FromResult(new DocumentServiceResponse(Stream.Null, headers, HttpStatusCode.OK)); + }); + + Mock mockDocumentClient = new Mock(); + mockDocumentClient.Setup(c => c.ServiceEndpoint).Returns(new Uri("https://mock.proxy.com")); + mockDocumentClient + .Setup(c => c.GetDatabaseAccountInternalAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new AccountProperties()); + + UserAgentContainer userAgentContainer = new UserAgentContainer(0, "TestFeature", "TestRegion", "TestSuffix"); + GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + SessionContainer sessionContainer = new SessionContainer("testhost"); + + GatewayStoreModel storeModel = new GatewayStoreModel( + endpointManager, + sessionContainer, + ConsistencyLevel.Session, + new DocumentClientEventSource(), + null, + null, + GlobalPartitionEndpointManagerNoOp.Instance, + isThinClientEnabled: true, + userAgentContainer); + + ClientCollectionCache clientCollectionCache = new Mock( + sessionContainer, + storeModel, + null, + null, + null, + false).Object; + + PartitionKeyRangeCache partitionKeyRangeCache = new Mock( + null, + storeModel, + clientCollectionCache, + endpointManager, + false, false).Object; + + storeModel.SetCaches(partitionKeyRangeCache, clientCollectionCache); + ReplaceThinClientStoreClientField(storeModel, thinClientStoreClient); + + // Create a ReadFeed request with A-IM: Incremental feed (LatestVersion change feed) + DocumentServiceRequest request = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + request.Headers[HttpConstants.HttpHeaders.A_IM] = HttpConstants.A_IMHeaderValues.IncrementalFeed; + + // Act + DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request); + + // Assert + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + Assert.IsTrue(thinClientInvoked, "LatestVersion change feed should be routed to ThinClient"); + } + + [TestMethod] + [Owner("aavasthy")] + public async Task ThinClient_ProcessMessageAsync_WithFullFidelityChangeFeed_ShouldFallbackToGateway() + { + // Arrange + bool thinClientInvoked = false; + MockThinClientStoreClient thinClientStoreClient = new MockThinClientStoreClient( + (request, resourceType, uri, endpoint, globalDatabaseAccountName, clientCollectionCache, cancellationToken) => + { + thinClientInvoked = true; + INameValueCollection headers = new StoreResponseNameValueCollection(); + return Task.FromResult(new DocumentServiceResponse(Stream.Null, headers, HttpStatusCode.OK)); + }); + + HttpResponseMessage successResponse = new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent("Response") }; + Mock mockCosmosHttpClient = new Mock(); + mockCosmosHttpClient.Setup(client => client.SendHttpAsync( + It.IsAny>>(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(successResponse); + + Mock mockDocumentClient = new Mock(); + mockDocumentClient.Setup(c => c.ServiceEndpoint).Returns(new Uri("https://mock.proxy.com")); + mockDocumentClient + .Setup(c => c.GetDatabaseAccountInternalAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new AccountProperties()); + + UserAgentContainer userAgentContainer = new UserAgentContainer(0, "TestFeature", "TestRegion", "TestSuffix"); + GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + SessionContainer sessionContainer = new SessionContainer("testhost"); + + GatewayStoreModel storeModel = new GatewayStoreModel( + endpointManager, + sessionContainer, + ConsistencyLevel.Session, + new DocumentClientEventSource(), + null, + httpClient: mockCosmosHttpClient.Object, + globalPartitionEndpointManager: GlobalPartitionEndpointManagerNoOp.Instance, + isThinClientEnabled: true, + userAgentContainer: userAgentContainer); + + ClientCollectionCache clientCollectionCache = new Mock( + sessionContainer, + storeModel, + null, + null, + null, + false).Object; + + PartitionKeyRangeCache partitionKeyRangeCache = new Mock( + null, + storeModel, + clientCollectionCache, + endpointManager, + false, false).Object; + + storeModel.SetCaches(partitionKeyRangeCache, clientCollectionCache); + ReplaceThinClientStoreClientField(storeModel, thinClientStoreClient); + + // Create a ReadFeed request with A-IM: Full-Fidelity Feed (AllVersionsAndDeletes change feed) + DocumentServiceRequest request = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + request.Headers[HttpConstants.HttpHeaders.A_IM] = HttpConstants.A_IMHeaderValues.FullFidelityFeed; + + // Act + DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request); + + // Assert + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + Assert.IsFalse(thinClientInvoked, "AllVersionsAndDeletes change feed should NOT be routed to ThinClient"); + } + + [TestMethod] + [Owner("aavasthy")] + public void IsOperationSupportedByThinClient_WithChangeFeedModes_ReturnsCorrectResults() + { + // LatestVersion (Incremental) change feed should be supported + DocumentServiceRequest incrementalRequest = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + incrementalRequest.Headers[HttpConstants.HttpHeaders.A_IM] = HttpConstants.A_IMHeaderValues.IncrementalFeed; + Assert.IsTrue(GatewayStoreModel.IsOperationSupportedByThinClient(incrementalRequest), + "LatestVersion (Incremental) change feed should be supported by ThinClient"); + + // AllVersionsAndDeletes (FullFidelity) change feed should NOT be supported + DocumentServiceRequest fullFidelityRequest = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + fullFidelityRequest.Headers[HttpConstants.HttpHeaders.A_IM] = HttpConstants.A_IMHeaderValues.FullFidelityFeed; + Assert.IsFalse(GatewayStoreModel.IsOperationSupportedByThinClient(fullFidelityRequest), + "AllVersionsAndDeletes (FullFidelity) change feed should NOT be supported by ThinClient"); + + // Plain ReadFeed (no A-IM header) should NOT be supported + DocumentServiceRequest plainReadFeedRequest = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + Assert.IsFalse(GatewayStoreModel.IsOperationSupportedByThinClient(plainReadFeedRequest), + "Plain ReadFeed (non-change-feed) should NOT be supported by ThinClient"); + + // ReadFeed on non-Document resource should NOT be supported + DocumentServiceRequest nonDocumentRequest = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Collection, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + nonDocumentRequest.Headers[HttpConstants.HttpHeaders.A_IM] = HttpConstants.A_IMHeaderValues.IncrementalFeed; + Assert.IsFalse(GatewayStoreModel.IsOperationSupportedByThinClient(nonDocumentRequest), + "ReadFeed on non-Document resource should NOT be supported by ThinClient"); + + // Unknown A-IM header value should NOT be supported (fail-safe for future feed modes) + DocumentServiceRequest unknownAImRequest = DocumentServiceRequest.Create( + operationType: OperationType.ReadFeed, + resourceType: ResourceType.Document, + resourceId: "NH1uAJ6ANm0=", + body: null, + authorizationTokenType: AuthorizationTokenType.PrimaryMasterKey); + unknownAImRequest.Headers[HttpConstants.HttpHeaders.A_IM] = "Unknown-Feed"; + Assert.IsFalse(GatewayStoreModel.IsOperationSupportedByThinClient(unknownAImRequest), + "Unknown A-IM values should NOT be supported by ThinClient"); + } + + [TestMethod] + [Owner("aavasthy")] + public void ThinClient_Dispose_ShouldDisposeThinClientStoreClient() { bool disposeCalled = false;