Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d479a9e
Adding initial files
ealsur Apr 3, 2019
a2ad3e9
Using Etag for continuation
ealsur Apr 3, 2019
f75b398
Removing unused
ealsur Apr 3, 2019
8276100
Refactoring to reduce variables
ealsur Apr 4, 2019
4609093
Refactoring to use CompositeToken
ealsur Apr 4, 2019
c023cb7
Adding feed test
ealsur Apr 4, 2019
aa9aa66
Refactor through Options
ealsur Apr 4, 2019
d55e7c0
Adding public methods and comments
ealsur Apr 4, 2019
9fb1f9e
Routing through the point transport handler
ealsur Apr 4, 2019
79a6657
Moving to outer if
ealsur Apr 4, 2019
528e9ce
Adding split logic
ealsur Apr 4, 2019
1f49d7d
Adding unit tests
ealsur Apr 4, 2019
b33ae42
Adding logic to detect invalid continuation tokens
ealsur Apr 5, 2019
d040a3d
Adding JSON validation
ealsur Apr 5, 2019
d81aede
Routing based on PKRangeId
ealsur Apr 5, 2019
1855c9a
Renaming and adding more tests
ealsur Apr 5, 2019
e0e265f
Moving logic into the token
ealsur Apr 5, 2019
ba394c8
Forcing refresh on split
ealsur Apr 5, 2019
e10b32c
Addressing final coments
ealsur Apr 5, 2019
f3b6bbe
Addressing feedback
ealsur Apr 8, 2019
8aaac26
Added test to cover CT passing
ealsur Apr 8, 2019
45f635c
Refactoring and adding pkrangedelegate
ealsur Apr 8, 2019
a0413ba
Argument checks
ealsur Apr 8, 2019
15b1164
Moving contract to CosmosRequestMessage
ealsur Apr 8, 2019
1cff0bd
Refactoring make EnsureInitialized async
ealsur Apr 8, 2019
274f5bc
Moving tests to a new file
ealsur Apr 8, 2019
bebc2fe
Adding PKrange assert
ealsur Apr 8, 2019
468ee11
Refactored back to parameters outside Options
ealsur Apr 8, 2019
a725365
UT split
ealsur Apr 8, 2019
bcb85f1
Adding Start* checks
ealsur Apr 8, 2019
14934d6
Adding new tests and renames
ealsur Apr 8, 2019
95da24c
Addressing comments
ealsur Apr 8, 2019
221bcf7
Refactoring for cache tests
ealsur Apr 9, 2019
1a753b5
Adding comments to tests
ealsur Apr 9, 2019
f14f826
Adding factory method
ealsur Apr 9, 2019
39cd566
Merge branch 'master' into users/ealsur/cfpull
kirankumarkolli Apr 9, 2019
62f2037
Addressing comments
ealsur Apr 9, 2019
cbc99fa
Refactoring PKRange outside options
ealsur Apr 9, 2019
a80f4cc
Merge branch 'users/ealsur/cfpull' of https://github.com/Azure/azure-…
ealsur Apr 9, 2019
854e8ba
Addressing comments
ealsur Apr 9, 2019
5100279
Removing StartFromBeginning
ealsur Apr 9, 2019
3bb1e17
Removing extra lines
ealsur Apr 9, 2019
da7432a
Removing unnecessary ToList
ealsur Apr 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ public virtual Stream Content

internal OperationType OperationType { get; set; }

internal string PartitionKeyRangeId { get; set; }

internal DocumentServiceRequest DocumentServiceRequest { get; set; }

internal IDocumentClientRetryPolicy DocumentClientRetryPolicy { get; set; }

internal bool IsPropertiesInitialized => this.properties.IsValueCreated;

internal bool IsDocumentFeedOperation => this.OperationType == OperationType.ReadFeed && this.ResourceType == ResourceType.Document && string.IsNullOrEmpty(this.PartitionKeyRangeId);
Comment thread
ealsur marked this conversation as resolved.

/// <summary>
/// Request properties Per request context available to handlers.
/// These will not be automatically included into the wire.
Expand Down Expand Up @@ -175,6 +179,12 @@ internal DocumentServiceRequest ToDocumentServiceRequest()
serviceRequest = new DocumentServiceRequest(this.OperationType, this.ResourceType, this.RequestUri?.ToString(), this.Content, AuthorizationTokenType.PrimaryMasterKey, this.Headers.CosmosMessageHeaders);
}

// Routing to a particular PartitionKeyRangeId
if (!string.IsNullOrEmpty(this.PartitionKeyRangeId))
{
serviceRequest.RouteTo(new PartitionKeyRangeIdentity(this.PartitionKeyRangeId));
Comment thread
ealsur marked this conversation as resolved.
}

serviceRequest.UseStatusCodeForFailures = true;
serviceRequest.UseStatusCodeFor429 = true;
serviceRequest.Properties = this.Properties;
Expand Down
17 changes: 7 additions & 10 deletions Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,33 @@
namespace Microsoft.Azure.Cosmos.Handlers
{
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Internal;
using Microsoft.Azure.Documents;

/// <summary>
/// Handler which selects the piepline for the requested resource operation
/// </summary>
internal class RouterHandler : CosmosRequestHandler
{
private readonly CosmosRequestHandler doucumentFeedHandler;
private readonly CosmosRequestHandler documentFeedHandler;
Comment thread
ealsur marked this conversation as resolved.
private readonly CosmosRequestHandler pointOperationHandler;

public RouterHandler(
CosmosRequestHandler doucumentFeedHandler,
CosmosRequestHandler documentFeedHandler,
CosmosRequestHandler pointOperationHandler)
{
if (doucumentFeedHandler == null)
if (documentFeedHandler == null)
{
throw new ArgumentNullException(nameof(doucumentFeedHandler));
throw new ArgumentNullException(nameof(documentFeedHandler));
}

if (pointOperationHandler == null)
{
throw new ArgumentNullException(nameof(pointOperationHandler));
}

this.doucumentFeedHandler = doucumentFeedHandler;
this.documentFeedHandler = documentFeedHandler;
this.pointOperationHandler = pointOperationHandler;
}

Expand All @@ -43,9 +40,9 @@ public override Task<CosmosResponseMessage> SendAsync(
CancellationToken cancellationToken)
{
CosmosRequestHandler targetHandler = null;
if (request.OperationType == OperationType.ReadFeed && request.ResourceType == ResourceType.Document)
if (request.IsDocumentFeedOperation)
{
targetHandler = doucumentFeedHandler;
targetHandler = documentFeedHandler;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class CosmosRequestMessageHeaders : CosmosMessageHeadersBase
[CosmosKnownHeaderAttribute(HeaderName = HttpConstants.HttpHeaders.OfferThroughput)]
internal string OfferThroughput { get; set; }

[CosmosKnownHeaderAttribute(HeaderName = HttpConstants.HttpHeaders.IfNoneMatch)]
internal string IfNoneMatch { get; set; }

private static KeyValuePair<string, PropertyInfo>[] knownHeaderProperties = CosmosMessageHeadersInternal.GetHeaderAttributes<CosmosRequestMessageHeaders>();

internal override Dictionary<string, CosmosCustomHeader> CreateKnownDictionary()
Expand Down
184 changes: 184 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since change feed is no longer part of the query pipeline maybe we should move it to a new namespace.

{
using System.Collections.Generic;
using System;
using System.Linq;
using Newtonsoft.Json;
using System.Threading.Tasks;
using System.Diagnostics;

/// <summary>
/// Stand by continuation token representing a contiguous read over all the ranges with continuation state across all ranges.
Comment thread
ealsur marked this conversation as resolved.
/// </summary>
/// <remarks>
/// The StandByFeed token represents the state of continuation tokens across all Partition Key Ranges and can be used to sequentially read the Change Feed for each range while maintaining a global state by serializing the values (and allowing deserialization).
/// </remarks>
internal class StandByFeedContinuationToken
Comment thread
ealsur marked this conversation as resolved.
{
internal delegate Task<IReadOnlyList<Documents.PartitionKeyRange>> PartitionKeyRangeCacheDelegate(string containerRid, Documents.Routing.Range<string> ranges, bool forceRefresh);

private readonly string containerRid;
private readonly PartitionKeyRangeCacheDelegate pkRangeCacheDelegate;
private readonly string inputContinuationToken;

private Queue<CompositeContinuationToken> compositeContinuationTokens;
Comment thread
ealsur marked this conversation as resolved.
private CompositeContinuationToken currentToken;

public static async Task<StandByFeedContinuationToken> CreateAsync(
string containerRid,
string initialStandByFeedContinuationToken,
PartitionKeyRangeCacheDelegate pkRangeCacheDelegate)
{
StandByFeedContinuationToken standByFeedContinuationToken = new StandByFeedContinuationToken(containerRid, initialStandByFeedContinuationToken, pkRangeCacheDelegate);
await standByFeedContinuationToken.EnsureInitializedAsync();
return standByFeedContinuationToken;
}

private StandByFeedContinuationToken(
string containerRid,
string initialStandByFeedContinuationToken,
PartitionKeyRangeCacheDelegate pkRangeCacheDelegate)
{
if (string.IsNullOrWhiteSpace(containerRid)) throw new ArgumentNullException(nameof(containerRid));
if (pkRangeCacheDelegate == null) throw new ArgumentNullException(nameof(pkRangeCacheDelegate));

this.containerRid = containerRid;
this.pkRangeCacheDelegate = pkRangeCacheDelegate;
this.inputContinuationToken = initialStandByFeedContinuationToken;
}

public async Task<Tuple<CompositeContinuationToken, string>> GetCurrentTokenAsync(bool forceRefresh = false)
{
Debug.Assert(this.compositeContinuationTokens != null);
IReadOnlyList<Documents.PartitionKeyRange> resolvedRanges = await this.TryGetOverlappingRangesAsync(this.currentToken.Range, forceRefresh: forceRefresh);
if (resolvedRanges.Count > 1)
{
this.HandleSplit(resolvedRanges);
}

Comment thread
ealsur marked this conversation as resolved.
return new Tuple<CompositeContinuationToken, string>(this.currentToken, resolvedRanges[0].Id);
}

public void MoveToNextToken()
{
Comment thread
ealsur marked this conversation as resolved.
CompositeContinuationToken recentToken = this.compositeContinuationTokens.Dequeue();
this.compositeContinuationTokens.Enqueue(recentToken);
this.currentToken = this.compositeContinuationTokens.Peek();
}

public new string ToString()
{
Debug.Assert(this.compositeContinuationTokens != null);
if (this.compositeContinuationTokens == null)
{
return null;
}

return JsonConvert.SerializeObject(this.compositeContinuationTokens);
}

private void HandleSplit(IReadOnlyList<Documents.PartitionKeyRange> keyRanges)
{
if (keyRanges == null) throw new ArgumentNullException(nameof(keyRanges));

// Update current
Documents.PartitionKeyRange firstRange = keyRanges[0];
this.currentToken.Range = new Documents.Routing.Range<string>(firstRange.MinInclusive, firstRange.MaxExclusive, true, false);
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
{
this.compositeContinuationTokens.Enqueue(new CompositeContinuationToken()
{
Range = new Documents.Routing.Range<string>(keyRange.MinInclusive, keyRange.MaxExclusive, true, false),
Token = this.currentToken.Token
});
}
}

private async Task EnsureInitializedAsync()
Comment thread
ealsur marked this conversation as resolved.
{
if (this.compositeContinuationTokens == null)
{
IEnumerable<CompositeContinuationToken> tokens = await this.BuildCompositeTokensAsync(this.inputContinuationToken);

this.InitializeCompositeTokens(tokens);

Debug.Assert(this.compositeContinuationTokens.Count > 0);
}
}

private async Task<IEnumerable<CompositeContinuationToken>> BuildCompositeTokensAsync(string initialContinuationToken)
{
if (string.IsNullOrEmpty(initialContinuationToken))
{
// Initialize composite token with all the ranges
IReadOnlyList<Documents.PartitionKeyRange> allRanges = await this.pkRangeCacheDelegate(
this.containerRid,
new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false),
false);

Debug.Assert(allRanges.Count != 0);
// Initial state for a scenario where user does not provide any initial continuation token.
// StartTime and StartFromBeginning can handle the logic if the user wants to start reading from any particular point in time
// After the first iteration, token will be updated with a recent value
return allRanges.Select(e => new CompositeContinuationToken()
{
Range = new Documents.Routing.Range<string>(e.MinInclusive, e.MaxExclusive, isMinInclusive: true, isMaxInclusive: false),
Token = null,
Comment thread
ealsur marked this conversation as resolved.
});
}

try
{
return JsonConvert.DeserializeObject<List<CompositeContinuationToken>>(initialContinuationToken);
}
catch(JsonReaderException ex)
{
throw new ArgumentOutOfRangeException($"Provided token has an invalid format: {initialContinuationToken}", ex);
}
}

private void InitializeCompositeTokens(IEnumerable<CompositeContinuationToken> tokens)
Comment thread
ealsur marked this conversation as resolved.
{
this.compositeContinuationTokens = new Queue<CompositeContinuationToken>();

foreach (CompositeContinuationToken token in tokens)
{
this.compositeContinuationTokens.Enqueue(token);
}

this.currentToken = this.compositeContinuationTokens.Peek();
Comment thread
ealsur marked this conversation as resolved.
}

private async Task<IReadOnlyList<Documents.PartitionKeyRange>> TryGetOverlappingRangesAsync(
Documents.Routing.Range<string> targetRange,
bool forceRefresh = false)
{
Debug.Assert(targetRange != null);

IReadOnlyList<Documents.PartitionKeyRange> keyRanges = await this.pkRangeCacheDelegate(
this.containerRid,
new Documents.Routing.Range<string>(
targetRange.Min,
targetRange.Max,
isMaxInclusive: true,
isMinInclusive: false),
forceRefresh);

if (keyRanges.Count == 0)
{
throw new ArgumentOutOfRangeException("RequestContinuation", $"Token contains invalid range {targetRange.Min}-{targetRange.Max}");
}

return keyRanges;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Diagnostics;
using System.Globalization;
using Microsoft.Azure.Documents;

/// <summary>
/// The Cosmos Change Feed request options
/// </summary>
internal class CosmosChangeFeedRequestOptions : CosmosRequestOptions
Comment thread
ealsur marked this conversation as resolved.
{
internal const string IfNoneMatchAllHeaderValue = "*";

/// <summary>
/// Specifies a particular point in time to start to read the change feed.
/// </summary>
/// <remarks>
/// In order to read the Change Feed from the beginning, set this to DateTime.MinValue.
/// </remarks>
public virtual DateTime? StartTime { get; set; }

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
/// <param name="request">The <see cref="CosmosRequestMessage"/></param>
public override void FillRequestOptions(CosmosRequestMessage request)
{
// Check if no Continuation Token is present
if (string.IsNullOrEmpty(request.Headers.IfNoneMatch))
Comment thread
ealsur marked this conversation as resolved.
Comment thread
kirankumarkolli marked this conversation as resolved.
{
if (this.StartTime == null)
{
request.Headers.IfNoneMatch = CosmosChangeFeedRequestOptions.IfNoneMatchAllHeaderValue;
}
else if (this.StartTime != null)
{
request.Headers.Add(HttpConstants.HttpHeaders.IfModifiedSince, this.StartTime.Value.ToUniversalTime().ToString("r", CultureInfo.InvariantCulture));
}
}

request.Headers.Add(HttpConstants.HttpHeaders.A_IM, HttpConstants.A_IMHeaderValues.IncrementalFeed);

base.FillRequestOptions(request);
}

internal static void FillPartitionKeyRangeId(CosmosRequestMessage request, string partitionKeyRangeId)
{
Debug.Assert(request != null);

if (!string.IsNullOrEmpty(partitionKeyRangeId))
{
request.PartitionKeyRangeId = partitionKeyRangeId;
}
}

internal static void FillContinuationToken(CosmosRequestMessage request, string continuationToken)
{
Debug.Assert(request != null);
Comment thread
kirankumarkolli marked this conversation as resolved.

if (!string.IsNullOrWhiteSpace(continuationToken))
Comment thread
ealsur marked this conversation as resolved.
{
// On REST level, change feed is using IfNoneMatch/ETag instead of continuation
request.Headers.IfNoneMatch = continuationToken;
}
}

internal static void FillMaxItemCount(CosmosRequestMessage request, int? maxItemCount)
{
Debug.Assert(request != null);

if (maxItemCount.HasValue)
{
request.Headers.Add(HttpConstants.HttpHeaders.PageSize, maxItemCount.Value.ToString(CultureInfo.InvariantCulture));
}
}
}
}
16 changes: 16 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Item/CosmosItemsCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Net;
using System.Text;
Expand Down Expand Up @@ -364,6 +365,21 @@ public override ChangeFeedProcessorBuilder CreateChangeFeedProcessorBuilder(
return new ChangeFeedProcessorBuilder(workflowName, this.container, changeFeedEstimatorCore, changeFeedEstimatorCore.ApplyBuildConfiguration);
}

internal CosmosFeedResultSetIterator GetStandByFeedIterator(
string continuationToken = null,
int? maxItemCount = null,
CosmosChangeFeedRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosChangeFeedRequestOptions cosmosQueryRequestOptions = requestOptions as CosmosChangeFeedRequestOptions ?? new CosmosChangeFeedRequestOptions();

return new CosmosChangeFeedResultSetIteratorCore(
continuationToken: continuationToken,
maxItemCount: maxItemCount,
cosmosContainer: (CosmosContainerCore)this.container,
options: cosmosQueryRequestOptions);
}

internal async Task<CosmosQueryResponse<T>> NextResultSetAsync<T>(
int? maxItemCount,
string continuationToken,
Expand Down
Loading