Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions Microsoft.Azure.Cosmos/src/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,29 @@

using System.Runtime.CompilerServices;

#if !SignAssembly
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
#else
#if SignAssembly
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2" + AssemblyRef.MoqPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Extensions.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Friends.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Table.Tests" + AssemblyRef.TestPublicKey)]

[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests" + AssemblyRef.TestPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests" + AssemblyRef.ProductPublicKey)]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests" + AssemblyRef.ProductPublicKey)]
#else
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.EmulatorTests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.NetFramework.Tests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Cosmos.Performance.Tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
#endif
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/CosmosRequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ public virtual Stream Content

internal OperationType OperationType { get; set; }

internal string PartitionKeyRangeId { get; set; }

internal DocumentServiceRequest DocumentServiceRequest { get; set; }

internal IDocumentClientRetryPolicy DocumentClientRetryPolicy { get; set; }

internal bool IsPropertiesInitialized => this.properties.IsValueCreated;

internal bool IsDocumentFeedOperation => this.OperationType == OperationType.ReadFeed && this.ResourceType == ResourceType.Document && string.IsNullOrEmpty(this.PartitionKeyRangeId);

/// <summary>
/// Request properties Per request context available to handlers.
/// These will not be automatically included into the wire.
Expand Down Expand Up @@ -175,6 +179,12 @@ internal DocumentServiceRequest ToDocumentServiceRequest()
serviceRequest = new DocumentServiceRequest(this.OperationType, this.ResourceType, this.RequestUri?.ToString(), this.Content, AuthorizationTokenType.PrimaryMasterKey, this.Headers.CosmosMessageHeaders);
}

// Routing to a particular PartitionKeyRangeId
if (!string.IsNullOrEmpty(this.PartitionKeyRangeId))
{
serviceRequest.RouteTo(new PartitionKeyRangeIdentity(this.PartitionKeyRangeId));
}

serviceRequest.UseStatusCodeForFailures = true;
serviceRequest.UseStatusCodeFor429 = true;
serviceRequest.Properties = this.Properties;
Expand Down
17 changes: 7 additions & 10 deletions Microsoft.Azure.Cosmos/src/Handler/RouterHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,33 @@
namespace Microsoft.Azure.Cosmos.Handlers
{
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Internal;
using Microsoft.Azure.Documents;

/// <summary>
/// Handler which selects the piepline for the requested resource operation
/// </summary>
internal class RouterHandler : CosmosRequestHandler
{
private readonly CosmosRequestHandler doucumentFeedHandler;
private readonly CosmosRequestHandler documentFeedHandler;
private readonly CosmosRequestHandler pointOperationHandler;

public RouterHandler(
CosmosRequestHandler doucumentFeedHandler,
CosmosRequestHandler documentFeedHandler,
CosmosRequestHandler pointOperationHandler)
{
if (doucumentFeedHandler == null)
if (documentFeedHandler == null)
{
throw new ArgumentNullException(nameof(doucumentFeedHandler));
throw new ArgumentNullException(nameof(documentFeedHandler));
}

if (pointOperationHandler == null)
{
throw new ArgumentNullException(nameof(pointOperationHandler));
}

this.doucumentFeedHandler = doucumentFeedHandler;
this.documentFeedHandler = documentFeedHandler;
this.pointOperationHandler = pointOperationHandler;
}

Expand All @@ -43,9 +40,9 @@ public override Task<CosmosResponseMessage> SendAsync(
CancellationToken cancellationToken)
{
CosmosRequestHandler targetHandler = null;
if (request.OperationType == OperationType.ReadFeed && request.ResourceType == ResourceType.Document)
if (request.IsDocumentFeedOperation)
{
targetHandler = doucumentFeedHandler;
targetHandler = documentFeedHandler;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class CosmosRequestMessageHeaders : CosmosMessageHeadersBase
[CosmosKnownHeaderAttribute(HeaderName = HttpConstants.HttpHeaders.OfferThroughput)]
internal string OfferThroughput { get; set; }

[CosmosKnownHeaderAttribute(HeaderName = HttpConstants.HttpHeaders.IfNoneMatch)]
internal string IfNoneMatch { get; set; }

private static KeyValuePair<string, PropertyInfo>[] knownHeaderProperties = CosmosMessageHeadersInternal.GetHeaderAttributes<CosmosRequestMessageHeaders>();

internal override Dictionary<string, CosmosCustomHeader> CreateKnownDictionary()
Expand Down
184 changes: 184 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/StandByFeedContinuationToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query
{
using System.Collections.Generic;
using System;
using System.Linq;
using Newtonsoft.Json;
using System.Threading.Tasks;
using System.Diagnostics;

/// <summary>
/// Stand by continuation token representing a contiguous read over all the ranges with continuation state across all ranges.
/// </summary>
/// <remarks>
/// The StandByFeed token represents the state of continuation tokens across all Partition Key Ranges and can be used to sequentially read the Change Feed for each range while maintaining a global state by serializing the values (and allowing deserialization).
/// </remarks>
internal class StandByFeedContinuationToken
{
internal delegate Task<IReadOnlyList<Documents.PartitionKeyRange>> PartitionKeyRangeCacheDelegate(string containerRid, Documents.Routing.Range<string> ranges, bool forceRefresh);

private readonly string containerRid;
private readonly PartitionKeyRangeCacheDelegate pkRangeCacheDelegate;
private readonly string inputContinuationToken;

private Queue<CompositeContinuationToken> compositeContinuationTokens;
private CompositeContinuationToken currentToken;

public static async Task<StandByFeedContinuationToken> CreateAsync(
string containerRid,
string initialStandByFeedContinuationToken,
PartitionKeyRangeCacheDelegate pkRangeCacheDelegate)
{
StandByFeedContinuationToken standByFeedContinuationToken = new StandByFeedContinuationToken(containerRid, initialStandByFeedContinuationToken, pkRangeCacheDelegate);
await standByFeedContinuationToken.EnsureInitializedAsync();
return standByFeedContinuationToken;
}

private StandByFeedContinuationToken(
string containerRid,
string initialStandByFeedContinuationToken,
PartitionKeyRangeCacheDelegate pkRangeCacheDelegate)
{
if (string.IsNullOrWhiteSpace(containerRid)) throw new ArgumentNullException(nameof(containerRid));
if (pkRangeCacheDelegate == null) throw new ArgumentNullException(nameof(pkRangeCacheDelegate));

this.containerRid = containerRid;
this.pkRangeCacheDelegate = pkRangeCacheDelegate;
this.inputContinuationToken = initialStandByFeedContinuationToken;
}

public async Task<Tuple<CompositeContinuationToken, string>> GetCurrentTokenAsync(bool forceRefresh = false)
{
Debug.Assert(this.compositeContinuationTokens != null);
IReadOnlyList<Documents.PartitionKeyRange> resolvedRanges = await this.TryGetOverlappingRangesAsync(this.currentToken.Range, forceRefresh: forceRefresh);
if (resolvedRanges.Count > 1)
{
this.HandleSplit(resolvedRanges);
}

return new Tuple<CompositeContinuationToken, string>(this.currentToken, resolvedRanges[0].Id);
}

public void MoveToNextToken()
{
CompositeContinuationToken recentToken = this.compositeContinuationTokens.Dequeue();
this.compositeContinuationTokens.Enqueue(recentToken);
this.currentToken = this.compositeContinuationTokens.Peek();
}

public new string ToString()
{
Debug.Assert(this.compositeContinuationTokens != null);
if (this.compositeContinuationTokens == null)
{
return null;
}

return JsonConvert.SerializeObject(this.compositeContinuationTokens);
}

private void HandleSplit(IReadOnlyList<Documents.PartitionKeyRange> keyRanges)
{
if (keyRanges == null) throw new ArgumentNullException(nameof(keyRanges));

// Update current
Documents.PartitionKeyRange firstRange = keyRanges[0];
this.currentToken.Range = new Documents.Routing.Range<string>(firstRange.MinInclusive, firstRange.MaxExclusive, true, false);
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
{
this.compositeContinuationTokens.Enqueue(new CompositeContinuationToken()
{
Range = new Documents.Routing.Range<string>(keyRange.MinInclusive, keyRange.MaxExclusive, true, false),
Token = this.currentToken.Token
});
}
}

private async Task EnsureInitializedAsync()
{
if (this.compositeContinuationTokens == null)
{
IEnumerable<CompositeContinuationToken> tokens = await this.BuildCompositeTokensAsync(this.inputContinuationToken);

this.InitializeCompositeTokens(tokens);

Debug.Assert(this.compositeContinuationTokens.Count > 0);
}
}

private async Task<IEnumerable<CompositeContinuationToken>> BuildCompositeTokensAsync(string initialContinuationToken)
{
if (string.IsNullOrEmpty(initialContinuationToken))
{
// Initialize composite token with all the ranges
IReadOnlyList<Documents.PartitionKeyRange> allRanges = await this.pkRangeCacheDelegate(
this.containerRid,
new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false),
false);

Debug.Assert(allRanges.Count != 0);
// Initial state for a scenario where user does not provide any initial continuation token.
// StartTime and StartFromBeginning can handle the logic if the user wants to start reading from any particular point in time
// After the first iteration, token will be updated with a recent value
return allRanges.Select(e => new CompositeContinuationToken()
{
Range = new Documents.Routing.Range<string>(e.MinInclusive, e.MaxExclusive, isMinInclusive: true, isMaxInclusive: false),
Token = null,
});
}

try
{
return JsonConvert.DeserializeObject<List<CompositeContinuationToken>>(initialContinuationToken);
}
catch(JsonReaderException ex)
{
throw new ArgumentOutOfRangeException($"Provided token has an invalid format: {initialContinuationToken}", ex);
}
}

private void InitializeCompositeTokens(IEnumerable<CompositeContinuationToken> tokens)
{
this.compositeContinuationTokens = new Queue<CompositeContinuationToken>();

foreach (CompositeContinuationToken token in tokens)
{
this.compositeContinuationTokens.Enqueue(token);
}

this.currentToken = this.compositeContinuationTokens.Peek();
}

private async Task<IReadOnlyList<Documents.PartitionKeyRange>> TryGetOverlappingRangesAsync(
Documents.Routing.Range<string> targetRange,
bool forceRefresh = false)
{
Debug.Assert(targetRange != null);

IReadOnlyList<Documents.PartitionKeyRange> keyRanges = await this.pkRangeCacheDelegate(
this.containerRid,
new Documents.Routing.Range<string>(
targetRange.Min,
targetRange.Max,
isMaxInclusive: true,
isMinInclusive: false),
forceRefresh);

if (keyRanges.Count == 0)
{
throw new ArgumentOutOfRangeException("RequestContinuation", $"Token contains invalid range {targetRange.Min}-{targetRange.Max}");
}

return keyRanges;
}
}
}
Loading