diff --git a/Microsoft.Azure.Cosmos/src/AssemblyInfo.cs b/Microsoft.Azure.Cosmos/src/AssemblyInfo.cs index fc19970ff2..666ecfc7f8 100644 --- a/Microsoft.Azure.Cosmos/src/AssemblyInfo.cs +++ b/Microsoft.Azure.Cosmos/src/AssemblyInfo.cs @@ -4,36 +4,29 @@ using System.Runtime.CompilerServices; -#if !SignAssembly -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests")] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests")] -[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] -#else +#if SignAssembly [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2" + AssemblyRef.MoqPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.ProductPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests" + AssemblyRef.TestPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests" + AssemblyRef.TestPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.TestPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends" + AssemblyRef.TestPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.TestPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions" + AssemblyRef.ProductPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions" + AssemblyRef.TestPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests" + AssemblyRef.TestPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.ProductPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.TestPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.ProductPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.TestPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table" + AssemblyRef.ProductPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table" + AssemblyRef.TestPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table.Tests" + AssemblyRef.ProductPublicKey)] [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table.Tests" + AssemblyRef.TestPublicKey)] + [assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests" + AssemblyRef.ProductPublicKey)] -[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests" + AssemblyRef.TestPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests" + AssemblyRef.ProductPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests" + AssemblyRef.ProductPublicKey)] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests" + AssemblyRef.ProductPublicKey)] +#else +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests")] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests")] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests")] +[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] #endif \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs b/Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs index fcb4960d4b..e55e40f78d 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs @@ -82,12 +82,16 @@ public virtual Stream Content internal OperationType OperationType { get; set; } + internal string PartitionKeyRangeId { get; set; } + internal DocumentServiceRequest DocumentServiceRequest { get; set; } internal IDocumentClientRetryPolicy DocumentClientRetryPolicy { get; set; } internal bool IsPropertiesInitialized => this.properties.IsValueCreated; + internal bool IsDocumentFeedOperation => this.OperationType == OperationType.ReadFeed && this.ResourceType == ResourceType.Document && string.IsNullOrEmpty(this.PartitionKeyRangeId); + /// /// Request properties Per request context available to handlers. /// These will not be automatically included into the wire. @@ -175,6 +179,12 @@ internal DocumentServiceRequest ToDocumentServiceRequest() serviceRequest = new DocumentServiceRequest(this.OperationType, this.ResourceType, this.RequestUri?.ToString(), this.Content, AuthorizationTokenType.PrimaryMasterKey, this.Headers.CosmosMessageHeaders); } + // Routing to a particular PartitionKeyRangeId + if (!string.IsNullOrEmpty(this.PartitionKeyRangeId)) + { + serviceRequest.RouteTo(new PartitionKeyRangeIdentity(this.PartitionKeyRangeId)); + } + serviceRequest.UseStatusCodeForFailures = true; serviceRequest.UseStatusCodeFor429 = true; serviceRequest.Properties = this.Properties; diff --git a/Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs index 6ad623e816..4cc569dcc3 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs @@ -5,11 +5,8 @@ namespace Microsoft.Azure.Cosmos.Handlers { using System; - using System.Diagnostics; - using System.Net.Http; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.Internal; using Microsoft.Azure.Documents; /// @@ -17,16 +14,16 @@ namespace Microsoft.Azure.Cosmos.Handlers /// internal class RouterHandler : CosmosRequestHandler { - private readonly CosmosRequestHandler doucumentFeedHandler; + private readonly CosmosRequestHandler documentFeedHandler; private readonly CosmosRequestHandler pointOperationHandler; public RouterHandler( - CosmosRequestHandler doucumentFeedHandler, + CosmosRequestHandler documentFeedHandler, CosmosRequestHandler pointOperationHandler) { - if (doucumentFeedHandler == null) + if (documentFeedHandler == null) { - throw new ArgumentNullException(nameof(doucumentFeedHandler)); + throw new ArgumentNullException(nameof(documentFeedHandler)); } if (pointOperationHandler == null) @@ -34,7 +31,7 @@ public RouterHandler( throw new ArgumentNullException(nameof(pointOperationHandler)); } - this.doucumentFeedHandler = doucumentFeedHandler; + this.documentFeedHandler = documentFeedHandler; this.pointOperationHandler = pointOperationHandler; } @@ -43,9 +40,9 @@ public override Task SendAsync( CancellationToken cancellationToken) { CosmosRequestHandler targetHandler = null; - if (request.OperationType == OperationType.ReadFeed && request.ResourceType == ResourceType.Document) + if (request.IsDocumentFeedOperation) { - targetHandler = doucumentFeedHandler; + targetHandler = documentFeedHandler; } else { diff --git a/Microsoft.Azure.Cosmos/src/Headers/CosmosRequestMessageHeaders.cs b/Microsoft.Azure.Cosmos/src/Headers/CosmosRequestMessageHeaders.cs index a0af438bde..aaba30f60b 100644 --- a/Microsoft.Azure.Cosmos/src/Headers/CosmosRequestMessageHeaders.cs +++ b/Microsoft.Azure.Cosmos/src/Headers/CosmosRequestMessageHeaders.cs @@ -32,6 +32,9 @@ public class CosmosRequestMessageHeaders : CosmosMessageHeadersBase [CosmosKnownHeaderAttribute(HeaderName = HttpConstants.HttpHeaders.OfferThroughput)] internal string OfferThroughput { get; set; } + [CosmosKnownHeaderAttribute(HeaderName = HttpConstants.HttpHeaders.IfNoneMatch)] + internal string IfNoneMatch { get; set; } + private static KeyValuePair[] knownHeaderProperties = CosmosMessageHeadersInternal.GetHeaderAttributes(); internal override Dictionary CreateKnownDictionary() diff --git a/Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs new file mode 100644 index 0000000000..cd716faa4e --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs @@ -0,0 +1,184 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query +{ + using System.Collections.Generic; + using System; + using System.Linq; + using Newtonsoft.Json; + using System.Threading.Tasks; + using System.Diagnostics; + + /// + /// Stand by continuation token representing a contiguous read over all the ranges with continuation state across all ranges. + /// + /// + /// The StandByFeed token represents the state of continuation tokens across all Partition Key Ranges and can be used to sequentially read the Change Feed for each range while maintaining a global state by serializing the values (and allowing deserialization). + /// + internal class StandByFeedContinuationToken + { + internal delegate Task> PartitionKeyRangeCacheDelegate(string containerRid, Documents.Routing.Range ranges, bool forceRefresh); + + private readonly string containerRid; + private readonly PartitionKeyRangeCacheDelegate pkRangeCacheDelegate; + private readonly string inputContinuationToken; + + private Queue compositeContinuationTokens; + private CompositeContinuationToken currentToken; + + public static async Task CreateAsync( + string containerRid, + string initialStandByFeedContinuationToken, + PartitionKeyRangeCacheDelegate pkRangeCacheDelegate) + { + StandByFeedContinuationToken standByFeedContinuationToken = new StandByFeedContinuationToken(containerRid, initialStandByFeedContinuationToken, pkRangeCacheDelegate); + await standByFeedContinuationToken.EnsureInitializedAsync(); + return standByFeedContinuationToken; + } + + private StandByFeedContinuationToken( + string containerRid, + string initialStandByFeedContinuationToken, + PartitionKeyRangeCacheDelegate pkRangeCacheDelegate) + { + if (string.IsNullOrWhiteSpace(containerRid)) throw new ArgumentNullException(nameof(containerRid)); + if (pkRangeCacheDelegate == null) throw new ArgumentNullException(nameof(pkRangeCacheDelegate)); + + this.containerRid = containerRid; + this.pkRangeCacheDelegate = pkRangeCacheDelegate; + this.inputContinuationToken = initialStandByFeedContinuationToken; + } + + public async Task> GetCurrentTokenAsync(bool forceRefresh = false) + { + Debug.Assert(this.compositeContinuationTokens != null); + IReadOnlyList resolvedRanges = await this.TryGetOverlappingRangesAsync(this.currentToken.Range, forceRefresh: forceRefresh); + if (resolvedRanges.Count > 1) + { + this.HandleSplit(resolvedRanges); + } + + return new Tuple(this.currentToken, resolvedRanges[0].Id); + } + + public void MoveToNextToken() + { + CompositeContinuationToken recentToken = this.compositeContinuationTokens.Dequeue(); + this.compositeContinuationTokens.Enqueue(recentToken); + this.currentToken = this.compositeContinuationTokens.Peek(); + } + + public new string ToString() + { + Debug.Assert(this.compositeContinuationTokens != null); + if (this.compositeContinuationTokens == null) + { + return null; + } + + return JsonConvert.SerializeObject(this.compositeContinuationTokens); + } + + private void HandleSplit(IReadOnlyList keyRanges) + { + if (keyRanges == null) throw new ArgumentNullException(nameof(keyRanges)); + + // Update current + Documents.PartitionKeyRange firstRange = keyRanges[0]; + this.currentToken.Range = new Documents.Routing.Range(firstRange.MinInclusive, firstRange.MaxExclusive, true, false); + // Add children + foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1)) + { + this.compositeContinuationTokens.Enqueue(new CompositeContinuationToken() + { + Range = new Documents.Routing.Range(keyRange.MinInclusive, keyRange.MaxExclusive, true, false), + Token = this.currentToken.Token + }); + } + } + + private async Task EnsureInitializedAsync() + { + if (this.compositeContinuationTokens == null) + { + IEnumerable tokens = await this.BuildCompositeTokensAsync(this.inputContinuationToken); + + this.InitializeCompositeTokens(tokens); + + Debug.Assert(this.compositeContinuationTokens.Count > 0); + } + } + + private async Task> BuildCompositeTokensAsync(string initialContinuationToken) + { + if (string.IsNullOrEmpty(initialContinuationToken)) + { + // Initialize composite token with all the ranges + IReadOnlyList allRanges = await this.pkRangeCacheDelegate( + this.containerRid, + new Documents.Routing.Range( + Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, + Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey, + isMinInclusive: true, + isMaxInclusive: false), + false); + + Debug.Assert(allRanges.Count != 0); + // Initial state for a scenario where user does not provide any initial continuation token. + // StartTime and StartFromBeginning can handle the logic if the user wants to start reading from any particular point in time + // After the first iteration, token will be updated with a recent value + return allRanges.Select(e => new CompositeContinuationToken() + { + Range = new Documents.Routing.Range(e.MinInclusive, e.MaxExclusive, isMinInclusive: true, isMaxInclusive: false), + Token = null, + }); + } + + try + { + return JsonConvert.DeserializeObject>(initialContinuationToken); + } + catch(JsonReaderException ex) + { + throw new ArgumentOutOfRangeException($"Provided token has an invalid format: {initialContinuationToken}", ex); + } + } + + private void InitializeCompositeTokens(IEnumerable tokens) + { + this.compositeContinuationTokens = new Queue(); + + foreach (CompositeContinuationToken token in tokens) + { + this.compositeContinuationTokens.Enqueue(token); + } + + this.currentToken = this.compositeContinuationTokens.Peek(); + } + + private async Task> TryGetOverlappingRangesAsync( + Documents.Routing.Range targetRange, + bool forceRefresh = false) + { + Debug.Assert(targetRange != null); + + IReadOnlyList keyRanges = await this.pkRangeCacheDelegate( + this.containerRid, + new Documents.Routing.Range( + targetRange.Min, + targetRange.Max, + isMaxInclusive: true, + isMinInclusive: false), + forceRefresh); + + if (keyRanges.Count == 0) + { + throw new ArgumentOutOfRangeException("RequestContinuation", $"Token contains invalid range {targetRange.Min}-{targetRange.Max}"); + } + + return keyRanges; + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/RequestOptions/CosmosChangeFeedRequestOptions.cs b/Microsoft.Azure.Cosmos/src/RequestOptions/CosmosChangeFeedRequestOptions.cs new file mode 100644 index 0000000000..f88e184070 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/RequestOptions/CosmosChangeFeedRequestOptions.cs @@ -0,0 +1,82 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Diagnostics; + using System.Globalization; + using Microsoft.Azure.Documents; + + /// + /// The Cosmos Change Feed request options + /// + internal class CosmosChangeFeedRequestOptions : CosmosRequestOptions + { + internal const string IfNoneMatchAllHeaderValue = "*"; + + /// + /// Specifies a particular point in time to start to read the change feed. + /// + /// + /// In order to read the Change Feed from the beginning, set this to DateTime.MinValue. + /// + public virtual DateTime? StartTime { get; set; } + + /// + /// Fill the CosmosRequestMessage headers with the set properties + /// + /// The + public override void FillRequestOptions(CosmosRequestMessage request) + { + // Check if no Continuation Token is present + if (string.IsNullOrEmpty(request.Headers.IfNoneMatch)) + { + if (this.StartTime == null) + { + request.Headers.IfNoneMatch = CosmosChangeFeedRequestOptions.IfNoneMatchAllHeaderValue; + } + else if (this.StartTime != null) + { + request.Headers.Add(HttpConstants.HttpHeaders.IfModifiedSince, this.StartTime.Value.ToUniversalTime().ToString("r", CultureInfo.InvariantCulture)); + } + } + + request.Headers.Add(HttpConstants.HttpHeaders.A_IM, HttpConstants.A_IMHeaderValues.IncrementalFeed); + + base.FillRequestOptions(request); + } + + internal static void FillPartitionKeyRangeId(CosmosRequestMessage request, string partitionKeyRangeId) + { + Debug.Assert(request != null); + + if (!string.IsNullOrEmpty(partitionKeyRangeId)) + { + request.PartitionKeyRangeId = partitionKeyRangeId; + } + } + + internal static void FillContinuationToken(CosmosRequestMessage request, string continuationToken) + { + Debug.Assert(request != null); + + if (!string.IsNullOrWhiteSpace(continuationToken)) + { + // On REST level, change feed is using IfNoneMatch/ETag instead of continuation + request.Headers.IfNoneMatch = continuationToken; + } + } + + internal static void FillMaxItemCount(CosmosRequestMessage request, int? maxItemCount) + { + Debug.Assert(request != null); + + if (maxItemCount.HasValue) + { + request.Headers.Add(HttpConstants.HttpHeaders.PageSize, maxItemCount.Value.ToString(CultureInfo.InvariantCulture)); + } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Resource/Item/CosmosItemsCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Item/CosmosItemsCore.cs index c9fbd1e42f..38b7e16993 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Item/CosmosItemsCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Item/CosmosItemsCore.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos { using System; using System.Collections.Generic; + using System.Globalization; using System.IO; using System.Net; using System.Text; @@ -364,6 +365,21 @@ public override ChangeFeedProcessorBuilder CreateChangeFeedProcessorBuilder( return new ChangeFeedProcessorBuilder(workflowName, this.container, changeFeedEstimatorCore, changeFeedEstimatorCore.ApplyBuildConfiguration); } + internal CosmosFeedResultSetIterator GetStandByFeedIterator( + string continuationToken = null, + int? maxItemCount = null, + CosmosChangeFeedRequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + CosmosChangeFeedRequestOptions cosmosQueryRequestOptions = requestOptions as CosmosChangeFeedRequestOptions ?? new CosmosChangeFeedRequestOptions(); + + return new CosmosChangeFeedResultSetIteratorCore( + continuationToken: continuationToken, + maxItemCount: maxItemCount, + cosmosContainer: (CosmosContainerCore)this.container, + options: cosmosQueryRequestOptions); + } + internal async Task> NextResultSetAsync( int? maxItemCount, string continuationToken, diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/CosmosChangeFeedResultSetIteratorCore.cs b/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/CosmosChangeFeedResultSetIteratorCore.cs new file mode 100644 index 0000000000..f3a4822c99 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/CosmosChangeFeedResultSetIteratorCore.cs @@ -0,0 +1,172 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Query; + using Microsoft.Azure.Cosmos.Routing; + + /// + /// Cosmos Stand-By Feed iterator implementing Composite Continuation Token + /// + internal class CosmosChangeFeedResultSetIteratorCore : CosmosFeedResultSetIterator + { + private const int DefaultMaxItemCount = 100; + private const string PageSizeErrorOnChangeFeedText = "Reduce page size and try again."; + + internal StandByFeedContinuationToken compositeContinuationToken; + + private readonly CosmosContainerCore cosmosContainer; + private readonly int? originalMaxItemCount; + private string containerRid; + private string continuationToken; + private string partitionKeyRangeId; + private int? maxItemCount; + + internal CosmosChangeFeedResultSetIteratorCore( + CosmosContainerCore cosmosContainer, + string continuationToken, + int? maxItemCount, + CosmosChangeFeedRequestOptions options) + { + if (cosmosContainer == null) throw new ArgumentNullException(nameof(cosmosContainer)); + + this.cosmosContainer = cosmosContainer; + this.changeFeedOptions = options; + this.maxItemCount = maxItemCount; + this.originalMaxItemCount = maxItemCount; + this.continuationToken = continuationToken; + this.HasMoreResults = true; + } + + /// + /// The query options for the result set + /// + protected readonly CosmosChangeFeedRequestOptions changeFeedOptions; + + /// + /// Get the next set of results from the cosmos service + /// + /// (Optional) representing request cancellation. + /// A query response from cosmos service + public override async Task FetchNextSetAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (this.compositeContinuationToken == null) + { + PartitionKeyRangeCache pkRangeCache = await this.cosmosContainer.Client.DocumentClient.GetPartitionKeyRangeCacheAsync(); + this.containerRid = await this.cosmosContainer.GetRID(cancellationToken); + this.compositeContinuationToken = await StandByFeedContinuationToken.CreateAsync(this.containerRid, this.continuationToken, pkRangeCache.TryGetOverlappingRangesAsync); + } + + (CompositeContinuationToken currentRangeToken, string rangeId) = await this.compositeContinuationToken.GetCurrentTokenAsync(); + this.partitionKeyRangeId = rangeId; + this.continuationToken = currentRangeToken.Token; + + CosmosResponseMessage response = await this.NextResultSetDelegate(this.continuationToken, this.partitionKeyRangeId, this.maxItemCount, this.changeFeedOptions, cancellationToken); + if (await this.ShouldRetryFailureAsync(response, cancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + + (CompositeContinuationToken currentRangeTokenForRetry, string rangeIdForRetry) = await this.compositeContinuationToken.GetCurrentTokenAsync(); + currentRangeToken = currentRangeTokenForRetry; + this.partitionKeyRangeId = rangeIdForRetry; + this.continuationToken = currentRangeToken.Token; + response = await this.NextResultSetDelegate(this.continuationToken, this.partitionKeyRangeId, this.maxItemCount, this.changeFeedOptions, cancellationToken); + } + + // Change Feed read uses Etag for continuation + string responseContinuationToken = response.Headers.ETag; + bool hasMoreResults = response.StatusCode != HttpStatusCode.NotModified; + if (!hasMoreResults) + { + // Current Range is done, push it to the end + this.compositeContinuationToken.MoveToNextToken(); + } + else if (response.IsSuccessStatusCode) + { + currentRangeToken.Token = responseContinuationToken; + } + + // Send to the user the composite state for all ranges + response.Headers.Continuation = this.compositeContinuationToken.ToString(); + return response; + } + + /// + /// During Feed read, split can happen or Max Item count can go beyond the max response size + /// + internal async Task ShouldRetryFailureAsync( + CosmosResponseMessage response, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (response.IsSuccessStatusCode || response.StatusCode == HttpStatusCode.NotModified) + { + if (this.maxItemCount != this.originalMaxItemCount) + { + this.maxItemCount = this.originalMaxItemCount; // Reset after successful execution. + } + + return false; + } + + bool partitionSplit = response.StatusCode == HttpStatusCode.Gone + && (response.Headers.SubStatusCode == Documents.SubStatusCodes.PartitionKeyRangeGone || response.Headers.SubStatusCode == Documents.SubStatusCodes.CompletingSplit); + if (partitionSplit) + { + // Forcing stale refresh of Partition Key Ranges Cache + await this.compositeContinuationToken.GetCurrentTokenAsync(forceRefresh: true); + return true; + } + + bool pageSizeError = response.ErrorMessage.Contains(CosmosChangeFeedResultSetIteratorCore.PageSizeErrorOnChangeFeedText); + if (pageSizeError) + { + if (!this.maxItemCount.HasValue) + { + this.maxItemCount = CosmosChangeFeedResultSetIteratorCore.DefaultMaxItemCount; + } + else if (this.maxItemCount <= 1) + { + return false; + } + + this.maxItemCount /= 2; + return true; + } + + return false; + } + + internal virtual Task NextResultSetDelegate( + string continuationToken, + string partitionKeyRangeId, + int? maxItemCount, + CosmosChangeFeedRequestOptions options, + CancellationToken cancellationToken) + { + Uri resourceUri = this.cosmosContainer.LinkUri; + return ExecUtils.ProcessResourceOperationAsync( + client: this.cosmosContainer.Database.Client, + resourceUri: resourceUri, + resourceType: Documents.ResourceType.Document, + operationType: Documents.OperationType.ReadFeed, + requestOptions: options, + requestEnricher: request => { + CosmosChangeFeedRequestOptions.FillContinuationToken(request, continuationToken); + CosmosChangeFeedRequestOptions.FillMaxItemCount(request, maxItemCount); + CosmosChangeFeedRequestOptions.FillPartitionKeyRangeId(request, partitionKeyRangeId); + }, + responseCreator: response => response, + partitionKey: null, + streamPayload: null, + cancellationToken: cancellationToken); + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs new file mode 100644 index 0000000000..0d8775af3c --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs @@ -0,0 +1,362 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Globalization; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Query; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; + + [TestClass] + public class CosmosItemChangeFeedTests : BaseCosmosClientHelper + { + private CosmosContainer Container = null; + private CosmosDefaultJsonSerializer jsonSerializer = null; + + [TestInitialize] + public async Task TestInitialize() + { + await base.TestInit(); + string PartitionKey = "/status"; + CosmosContainerResponse response = await this.database.Containers.CreateContainerAsync( + new CosmosContainerSettings(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey), + cancellationToken: this.cancellationToken); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Container); + Assert.IsNotNull(response.Resource); + this.Container = response; + this.jsonSerializer = new CosmosDefaultJsonSerializer(); + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + /// + /// Test to verify that StartFromBeginning works as expected by inserting 25 items, reading them all, then taking the last continuationtoken, + /// inserting another 25, and verifying that the iterator continues from the saved token and reads the second 25 for a total of 50 documents. + /// + [TestMethod] + public async Task StandByFeedIterator() + { + int totalCount = 0; + string lastcontinuation = string.Empty; + int firstRunTotal = 25; + int batchSize = 25; + Documents.Routing.Range previousRange = null; + Documents.Routing.Range currentRange = null; + + int pkRangesCount = (await this.Container.Client.DocumentClient.ReadPartitionKeyRangeFeedAsync(this.Container.LinkUri)).Count; + int visitedPkRanges = 0; + + await CreateRandomItems(batchSize, randomPartitionKey: true); + CosmosItemsCore itemsCore = (CosmosItemsCore)this.Container.Items; + CosmosFeedResultSetIterator setIterator = itemsCore.GetStandByFeedIterator(requestOptions: new CosmosChangeFeedRequestOptions() { StartTime = DateTime.MinValue }); + + while (setIterator.HasMoreResults) + { + using (CosmosResponseMessage responseMessage = + await setIterator.FetchNextSetAsync(this.cancellationToken)) + { + lastcontinuation = responseMessage.Headers.Continuation; + var deserializedToken = JsonConvert.DeserializeObject>(lastcontinuation); + currentRange = deserializedToken[0].Range; + Assert.AreEqual(pkRangesCount, deserializedToken.Count); + if (responseMessage.IsSuccessStatusCode) + { + Collection response = new CosmosDefaultJsonSerializer().FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + + if (!currentRange.Equals(previousRange)) + { + visitedPkRanges++; + } + + if (visitedPkRanges == pkRangesCount && responseMessage.StatusCode == System.Net.HttpStatusCode.NotModified) break; + previousRange = currentRange; + } + + } + Assert.AreEqual(firstRunTotal, totalCount); + + int expectedFinalCount = 50; + previousRange = null; + currentRange = null; + visitedPkRanges = 0; + + // Insert another batch of 25 and use the last continuation token from the first cycle + await CreateRandomItems(batchSize, randomPartitionKey: true); + CosmosFeedResultSetIterator setIteratorNew = + itemsCore.GetStandByFeedIterator(lastcontinuation); + + while (setIteratorNew.HasMoreResults) + { + using (CosmosResponseMessage responseMessage = + await setIteratorNew.FetchNextSetAsync(this.cancellationToken)) + { + lastcontinuation = responseMessage.Headers.Continuation; + currentRange = JsonConvert.DeserializeObject>(lastcontinuation)[0].Range; + + if (responseMessage.IsSuccessStatusCode) + { + Collection response = new CosmosDefaultJsonSerializer().FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + + if(!currentRange.Equals(previousRange)) + { + visitedPkRanges++; + } + + if (visitedPkRanges == pkRangesCount && responseMessage.StatusCode == System.Net.HttpStatusCode.NotModified) break; + previousRange = currentRange; + } + + } + + Assert.AreEqual(expectedFinalCount, totalCount); + } + + /// + /// Test that verifies that, if the token contains an invalid range, we throw. + /// + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public async Task StandByFeedIterator_WithInexistentRange() + { + // Add some random range, this will force the failure + List corruptedTokens = new List(); + corruptedTokens.Add(new CompositeContinuationToken() + { + Range = new Documents.Routing.Range("whatever", "random", true, false), + Token = "oops" + }); + + string corruptedTokenSerialized = JsonConvert.SerializeObject(corruptedTokens); + + CosmosItemsCore itemsCore = (CosmosItemsCore)this.Container.Items; + CosmosFeedResultSetIterator setIteratorNew = + itemsCore.GetStandByFeedIterator(corruptedTokenSerialized); + + CosmosResponseMessage responseMessage = + await setIteratorNew.FetchNextSetAsync(this.cancellationToken); + + Assert.Fail("Should have thrown."); + } + + /// + /// Test that verifies that MaxItemCount is honored by checking the count of documents in the responses. + /// + [TestMethod] + public async Task StandByFeedIterator_WithMaxItemCount() + { + await CreateRandomItems(2, randomPartitionKey: true); + CosmosItemsCore itemsCore = (CosmosItemsCore)this.Container.Items; + CosmosFeedResultSetIterator setIterator = itemsCore.GetStandByFeedIterator(maxItemCount: 1, requestOptions: new CosmosChangeFeedRequestOptions() { StartTime = DateTime.MinValue }); + + while (setIterator.HasMoreResults) + { + using (CosmosResponseMessage responseMessage = + await setIterator.FetchNextSetAsync(this.cancellationToken)) + { + if (responseMessage.IsSuccessStatusCode) + { + Collection response = new CosmosDefaultJsonSerializer().FromStream>(responseMessage.Content).Data; + if (response.Count > 0) + { + Assert.AreEqual(1, response.Count); + return; + } + } + } + + } + + Assert.Fail("Found no batch with size 1"); + } + + /// + /// Test that does not use FetchNextSetAsync but creates new iterators passing along the previous one's continuationtoken. + /// + [TestMethod] + public async Task StandByFeedIterator_NoFetchNext() + { + var pkRanges = await this.Container.Client.DocumentClient.ReadPartitionKeyRangeFeedAsync(this.Container.LinkUri); + + int expected = 25; + int iterations = 0; + await CreateRandomItems(expected, randomPartitionKey: true); + CosmosItemsCore itemsCore = (CosmosItemsCore)this.Container.Items; + string continuationToken = null; + int count = 0; + while (true) + { + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions() { StartTime = DateTime.MinValue }; + + CosmosFeedResultSetIterator setIterator = itemsCore.GetStandByFeedIterator(continuationToken, requestOptions: requestOptions); + using (CosmosResponseMessage responseMessage = + await setIterator.FetchNextSetAsync(this.cancellationToken)) + { + continuationToken = responseMessage.Headers.Continuation; + if (responseMessage.IsSuccessStatusCode) + { + Collection response = new CosmosDefaultJsonSerializer().FromStream>(responseMessage.Content).Data; + count += response.Count; + } + } + + if (count > expected) + { + Assert.Fail($"{count} does not equal {expected}"); + } + + if (count.Equals(expected)) + { + break; + } + + if (iterations++ > pkRanges.Count) + { + Assert.Fail("Feed does not contain all elements even after looping through PK ranges. Either the continuation is not moving forward or there is some state problem."); + + } + } + } + + /// + /// Verifies that the internal delegate for PKRangeCache gets called with forceRefresh true after a split. + /// + [TestMethod] + public async Task StandByFeedIterator_VerifyRefreshIsCalledOnSplit() + { + CosmosChangeFeedResultSetIteratorCoreMock iterator = new CosmosChangeFeedResultSetIteratorCoreMock((CosmosContainerCore)this.Container, "", 100, new CosmosChangeFeedRequestOptions()); + using (CosmosResponseMessage responseMessage = + await iterator.FetchNextSetAsync(this.cancellationToken)) + { + Assert.IsTrue(iterator.HasCalledForceRefresh); + Assert.IsTrue(iterator.Iteration > 1); + Assert.AreEqual(responseMessage.StatusCode, System.Net.HttpStatusCode.NotModified); + } + } + + private async Task> CreateRandomItems(int pkCount, int perPKItemCount = 1, bool randomPartitionKey = true) + { + Assert.IsFalse(!randomPartitionKey && perPKItemCount > 1); + + List createdList = new List(); + for (int i = 0; i < pkCount; i++) + { + string pk = "TBD"; + if (randomPartitionKey) + { + pk += Guid.NewGuid().ToString(); + } + + for (int j = 0; j < perPKItemCount; j++) + { + ToDoActivity temp = CreateRandomToDoActivity(pk); + + createdList.Add(temp); + + await this.Container.Items.CreateItemAsync(partitionKey: temp.status, item: temp); + } + } + + return createdList; + } + + private ToDoActivity CreateRandomToDoActivity(string pk = null) + { + if (string.IsNullOrEmpty(pk)) + { + pk = "TBD" + Guid.NewGuid().ToString(); + } + + return new ToDoActivity() + { + id = Guid.NewGuid().ToString(), + description = "CreateRandomToDoActivity", + status = pk, + taskNum = 42, + cost = double.MaxValue + }; + } + + private class CosmosChangeFeedResultSetIteratorCoreMock : CosmosChangeFeedResultSetIteratorCore + { + public int Iteration = 0; + public bool HasCalledForceRefresh = false; + + internal CosmosChangeFeedResultSetIteratorCoreMock( + CosmosContainerCore cosmosContainer, + string continuationToken, + int? maxItemCount, + CosmosChangeFeedRequestOptions options) : base(cosmosContainer, continuationToken, maxItemCount, options) + { + List compositeContinuationTokens = new List() + { + new CompositeContinuationToken() + { + Token = null, + Range = new Documents.Routing.Range("A", "B", true, false) + } + }; + + string serialized = JsonConvert.SerializeObject(compositeContinuationTokens); + + this.compositeContinuationToken = StandByFeedContinuationToken.CreateAsync("containerRid", serialized, (string containerRid, Documents.Routing.Range ranges, bool forceRefresh) => + { + IReadOnlyList filteredRanges = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B", Id = "0" } + }; + + if (forceRefresh) + { + this.HasCalledForceRefresh = true; + } + + return Task.FromResult(filteredRanges); + }).Result; + } + + internal override Task NextResultSetDelegate( + string continuationToken, + string partitionKeyRangeId, + int? maxItemCount, + CosmosChangeFeedRequestOptions options, + CancellationToken cancellationToken) + { + if (this.Iteration ++ == 0) + { + CosmosResponseMessage httpResponse = new CosmosResponseMessage(System.Net.HttpStatusCode.Gone); + httpResponse.Headers.Add(Documents.WFConstants.BackendHeaders.SubStatus, ((uint)Documents.SubStatusCodes.PartitionKeyRangeGone).ToString(CultureInfo.InvariantCulture)); + + return Task.FromResult(httpResponse); + } + + return Task.FromResult(new CosmosResponseMessage(System.Net.HttpStatusCode.NotModified)); + } + } + + + public class ToDoActivity + { + public string id { get; set; } + public int taskNum { get; set; } + public double cost { get; set; } + public string description { get; set; } + public string status { get; set; } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosResponseMessageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosResponseMessageTests.cs new file mode 100644 index 0000000000..188f9bf669 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosResponseMessageTests.cs @@ -0,0 +1,52 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Collections.Generic; + using System.Collections.Specialized; + using System.Linq; + using System.Reflection; + using Microsoft.Azure.Cosmos.Internal; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class CosmosResponseMessageTests + { + [TestMethod] + public void IsDocumentFeed_ForDocumentReads() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + request.OperationType = OperationType.ReadFeed; + request.ResourceType = ResourceType.Document; + Assert.IsTrue(request.IsDocumentFeedOperation); + } + + [TestMethod] + public void IsDocumentFeed_ForChangeFeed() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + request.OperationType = OperationType.ReadFeed; + request.ResourceType = ResourceType.Document; + request.PartitionKeyRangeId = "something"; + Assert.IsFalse(request.IsDocumentFeedOperation); + } + + [TestMethod] + public void IsDocumentFeed_ForOtherOperations() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + request.OperationType = OperationType.Upsert; + request.ResourceType = ResourceType.Document; + Assert.IsFalse(request.IsDocumentFeedOperation); + + CosmosRequestMessage request2 = new CosmosRequestMessage(); + request2.OperationType = OperationType.ReadFeed; + request2.ResourceType = ResourceType.Database; + Assert.IsFalse(request2.IsDocumentFeedOperation); + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs new file mode 100644 index 0000000000..8a91fca3ab --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs @@ -0,0 +1,303 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Query; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; + + [TestClass] + public class StandByFeedContinuationTokenTests + { + private const string ContainerRid = "containerRid"; + + [TestMethod] + public async Task EnsureInitialized_CreatesToken_WithNoInitialContinuation() + { + IReadOnlyList keyRanges = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B", Id = "0" }, + new Documents.PartitionKeyRange() { MinInclusive = "C", MaxExclusive ="D", Id = "1" }, + }; + + StandByFeedContinuationToken compositeToken = await StandByFeedContinuationToken.CreateAsync(StandByFeedContinuationTokenTests.ContainerRid, null, StandByFeedContinuationTokenTests.CreateCacheFromRange(keyRanges)); + (CompositeContinuationToken token, string rangeId) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[0].MinInclusive, token.Range.Min); + Assert.AreEqual(keyRanges[0].MaxExclusive, token.Range.Max); + Assert.AreEqual(keyRanges[0].Id, rangeId); + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token2, string rangeId2) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[1].MinInclusive, token2.Range.Min); + Assert.AreEqual(keyRanges[1].MaxExclusive, token2.Range.Max); + Assert.AreEqual(keyRanges[1].Id, rangeId2); + } + + [TestMethod] + public async Task EnsureInitialized_CreatesToken_WithInitialContinuation() + { + List compositeContinuationTokens = new List() + { + StandByFeedContinuationTokenTests.BuildTokenForRange("A", "B", "token1"), + StandByFeedContinuationTokenTests.BuildTokenForRange("C", "D", "token2") + }; + + string initialToken = JsonConvert.SerializeObject(compositeContinuationTokens); + + IReadOnlyList keyRanges = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B", Id = "0" }, + new Documents.PartitionKeyRange() { MinInclusive = "C", MaxExclusive ="D", Id = "1" }, + }; + + StandByFeedContinuationToken compositeToken = await StandByFeedContinuationToken.CreateAsync(StandByFeedContinuationTokenTests.ContainerRid, initialToken, CreateCacheFromRange(keyRanges)); + (CompositeContinuationToken token, string rangeId) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[0].MinInclusive, token.Range.Min); + Assert.AreEqual(keyRanges[0].MaxExclusive, token.Range.Max); + Assert.AreEqual(keyRanges[0].Id, rangeId); + Assert.AreEqual(compositeContinuationTokens[0].Token, token.Token); + + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token2, string rangeId2) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[1].MinInclusive, token2.Range.Min); + Assert.AreEqual(keyRanges[1].MaxExclusive, token2.Range.Max); + Assert.AreEqual(keyRanges[1].Id, rangeId2); + Assert.AreEqual(compositeContinuationTokens[1].Token, token2.Token); + } + + [TestMethod] + public async Task SerializationIsExpected() + { + List compositeContinuationTokens = new List() + { + StandByFeedContinuationTokenTests.BuildTokenForRange("A", "B", "C"), + StandByFeedContinuationTokenTests.BuildTokenForRange("D", "E", "F") + }; + + string expected = JsonConvert.SerializeObject(compositeContinuationTokens); + + List keyRanges = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B" }, + new Documents.PartitionKeyRange() { MinInclusive = "D", MaxExclusive ="E" }, + }; + StandByFeedContinuationToken compositeToken = await StandByFeedContinuationToken.CreateAsync(StandByFeedContinuationTokenTests.ContainerRid, null, CreateCacheFromRange(keyRanges)); + (CompositeContinuationToken token, string rangeId) = await compositeToken.GetCurrentTokenAsync(); + token.Token = "C"; + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token2, string rangeId2) = await compositeToken.GetCurrentTokenAsync(); + token2.Token = "F"; + compositeToken.MoveToNextToken(); + + Assert.AreEqual(expected, compositeToken.ToString()); + } + + [TestMethod] + public async Task MoveToNextTokenCircles() + { + List keyRanges = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B" }, + new Documents.PartitionKeyRange() { MinInclusive = "D", MaxExclusive ="E" }, + }; + StandByFeedContinuationToken compositeToken = await StandByFeedContinuationToken.CreateAsync(StandByFeedContinuationTokenTests.ContainerRid, null, CreateCacheFromRange(keyRanges)); + (CompositeContinuationToken token, string rangeId) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[0].MinInclusive, token.Range.Min); + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token2, string rangeId2) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[1].MinInclusive, token2.Range.Min); + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token3, string rangeId3) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[0].MinInclusive, token3.Range.Min); + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token4, string rangeId4) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRanges[1].MinInclusive, token4.Range.Min); + } + + [TestMethod] + public async Task HandleSplitGeneratesChildren() + { + List compositeContinuationTokens = new List() + { + StandByFeedContinuationTokenTests.BuildTokenForRange("A", "C", "token1"), + StandByFeedContinuationTokenTests.BuildTokenForRange("C", "F", "token2") + }; + + string expected = JsonConvert.SerializeObject(compositeContinuationTokens); + + List keyRangesAfterSplit = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B" }, + new Documents.PartitionKeyRange() { MinInclusive = "B", MaxExclusive ="C" }, + new Documents.PartitionKeyRange() { MinInclusive = "C", MaxExclusive ="F" }, + }; + + StandByFeedContinuationToken compositeToken = await StandByFeedContinuationToken.CreateAsync(StandByFeedContinuationTokenTests.ContainerRid, expected, CreateCacheFromRange(keyRangesAfterSplit)); + + (CompositeContinuationToken token, string rangeId) = await compositeToken.GetCurrentTokenAsync(); + // Current should be updated + Assert.AreEqual(keyRangesAfterSplit[0].MinInclusive, token.Range.Min); + Assert.AreEqual(keyRangesAfterSplit[0].MaxExclusive, token.Range.Max); + Assert.AreEqual(compositeContinuationTokens[0].Token, token.Token); + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token2, string rangeId2) = await compositeToken.GetCurrentTokenAsync(); + // Next should be the original second + Assert.AreEqual(compositeContinuationTokens[1].Range.Min, token2.Range.Min); + Assert.AreEqual(compositeContinuationTokens[1].Range.Max, token2.Range.Max); + Assert.AreEqual(compositeContinuationTokens[1].Token, token2.Token); + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token3, string rangeId3) = await compositeToken.GetCurrentTokenAsync(); + // Finally the new children + Assert.AreEqual(keyRangesAfterSplit[1].MinInclusive, token3.Range.Min); + Assert.AreEqual(keyRangesAfterSplit[1].MaxExclusive, token3.Range.Max); + Assert.AreEqual(compositeContinuationTokens[0].Token, token3.Token); + // And go back to the beginning + compositeToken.MoveToNextToken(); + (CompositeContinuationToken token5, string rangeId5) = await compositeToken.GetCurrentTokenAsync(); + Assert.AreEqual(keyRangesAfterSplit[0].MinInclusive, token5.Range.Min); + Assert.AreEqual(keyRangesAfterSplit[0].MaxExclusive, token5.Range.Max); + Assert.AreEqual(compositeContinuationTokens[0].Token, token5.Token); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public async Task ConstructorWithInvalidTokenFormat() + { + IReadOnlyList keyRanges = new List() + { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive ="B", Id = "0" }, + new Documents.PartitionKeyRange() { MinInclusive = "C", MaxExclusive ="D", Id = "1" }, + }; + + StandByFeedContinuationToken token = await StandByFeedContinuationToken.CreateAsync("containerRid", "notatoken", CreateCacheFromRange(keyRanges)); + await token.GetCurrentTokenAsync(); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public async Task ConstructorWithNullContainer() + { + await StandByFeedContinuationToken.CreateAsync(null, null, null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public async Task ConstructorWithNullDelegate() + { + await StandByFeedContinuationToken.CreateAsync("something", null, null); + } + + [TestMethod] + public void CosmosChangeFeedRequestOptions_ContinuationIsSet() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions(){ }; + + CosmosChangeFeedRequestOptions.FillContinuationToken(request, "something"); + requestOptions.FillRequestOptions(request); + + Assert.AreEqual("something", request.Headers.IfNoneMatch); + Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); + } + + [TestMethod] + public void CosmosChangeFeedRequestOptions_DefaultValues() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions() { }; + + requestOptions.FillRequestOptions(request); + + Assert.AreEqual(CosmosChangeFeedRequestOptions.IfNoneMatchAllHeaderValue, request.Headers.IfNoneMatch); + Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); + } + + [TestMethod] + public void CosmosChangeFeedRequestOptions_MaxItemSizeIsSet() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions() { }; + + CosmosChangeFeedRequestOptions.FillMaxItemCount(request, 10); + requestOptions.FillRequestOptions(request); + + Assert.AreEqual("10", request.Headers[Documents.HttpConstants.HttpHeaders.PageSize]); + Assert.AreEqual(CosmosChangeFeedRequestOptions.IfNoneMatchAllHeaderValue, request.Headers.IfNoneMatch); + Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); + } + + [TestMethod] + public void CosmosChangeFeedRequestOptions_ContinuationBeatsStartTime() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions() + { + StartTime = new DateTime(1985, 1, 1) + }; + + CosmosChangeFeedRequestOptions.FillContinuationToken(request, "something"); + requestOptions.FillRequestOptions(request); + + Assert.AreEqual("something", request.Headers.IfNoneMatch); + Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); + } + + [TestMethod] + public void CosmosChangeFeedRequestOptions_AddsStartTime() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions() + { + StartTime = new DateTime(1985, 1, 1, 0, 0,0, DateTimeKind.Utc) + }; + + requestOptions.FillRequestOptions(request); + + Assert.AreEqual("Tue, 01 Jan 1985 00:00:00 GMT", request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); + Assert.IsNull(request.Headers.IfNoneMatch); + } + + [TestMethod] + public void CosmosChangeFeedRequestOptions_AddsPartitionKeyRangeId() + { + CosmosRequestMessage request = new CosmosRequestMessage(); + CosmosChangeFeedRequestOptions requestOptions = new CosmosChangeFeedRequestOptions(); + + CosmosChangeFeedRequestOptions.FillPartitionKeyRangeId(request, "randomPK"); + + Assert.AreEqual("randomPK", request.PartitionKeyRangeId); + } + + private static StandByFeedContinuationToken.PartitionKeyRangeCacheDelegate CreateCacheFromRange(IReadOnlyList keyRanges) + { + return (string containerRid, Documents.Routing.Range ranges, bool forceRefresh) => + { + if (ranges.Max.Equals(Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey)) + { + return Task.FromResult(keyRanges); + } + + IReadOnlyList filteredRanges = new List(keyRanges.Where(range=> range.MinInclusive.CompareTo(ranges.Min) >= 0 && range.MaxExclusive.CompareTo(ranges.Max) <= 0)); + + return Task.FromResult(filteredRanges); + }; + } + + private static CompositeContinuationToken BuildTokenForRange( + string min, + string max, + string token) + { + return new CompositeContinuationToken() + { + Token = token, + Range = new Documents.Routing.Range(min, max, true, false) + }; + } + } +}