Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ internal static class CosmosQueryExecutionContextFactory
{
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string OptimisticDirectExecutionToken = "OptimisticDirectExecutionToken";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
Expand Down Expand Up @@ -752,7 +753,17 @@ private static Documents.PartitionKeyDefinition GetPartitionKeyDefinition(InputP
ContainerQueryProperties containerQueryProperties,
ITrace trace)
{
if (!inputParameters.EnableOptimisticDirectExecution) return null;
if (!inputParameters.EnableOptimisticDirectExecution)
{
if (inputParameters.InitialUserContinuationToken != null
&& OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken))
{
throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " +
$"{inputParameters.InitialUserContinuationToken}");
}

return null;
}

Debug.Assert(containerQueryProperties.ResourceId != null, "CosmosQueryExecutionContextFactory Assert!", "Container ResourceId cannot be null!");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery
{
using System;
using System.Collections.Generic;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
Expand All @@ -30,6 +28,12 @@ public OptimisticDirectExecutionContinuationToken(ParallelContinuationToken toke

public Range<string> Range => this.Token.Range;

public static bool IsOptimisticDirectExecutionContinuationToken(CosmosElement continuationToken)
{
CosmosObject cosmosObjectContinuationToken = continuationToken as CosmosObject;
return !(cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken));
}

public static CosmosElement ToCosmosElement(OptimisticDirectExecutionContinuationToken continuationToken)
{
CosmosElement inner = ParallelContinuationToken.ToCosmosElement(continuationToken.Token);
Expand All @@ -42,14 +46,14 @@ public static CosmosElement ToCosmosElement(OptimisticDirectExecutionContinuatio

public static TryCatch<OptimisticDirectExecutionContinuationToken> TryCreateFromCosmosElement(CosmosElement cosmosElement)
{
CosmosObject cosmosObjectContinuationToken = cosmosElement as CosmosObject;
if (cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken))
if (!IsOptimisticDirectExecutionContinuationToken(cosmosElement))
{
return TryCatch<OptimisticDirectExecutionContinuationToken>.FromException(
new MalformedContinuationTokenException(
message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n"));
new MalformedContinuationTokenException(
message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n"));
}

CosmosObject cosmosObjectContinuationToken = (CosmosObject)cosmosElement;
TryCatch<ParallelContinuationToken> inner = ParallelContinuationToken.TryCreateFromCosmosElement(cosmosObjectContinuationToken[OptimisticDirectExecutionToken]);

return inner.Succeeded ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents.Collections;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -786,6 +785,56 @@ public async Task QueryActivityIdWithContinuationTokenAndTraceTest()

}

[TestMethod]
public async Task TesOdeTokenCompatibilityWithNonOdePipeline()
{
string query = "select top 200 * from c";
CosmosClient client = DirectCosmosClient;
Container container = client.GetContainer(DatabaseId, ContainerId);

// Create items
for (int i = 0; i < 500; i++)
{
await container.CreateItemAsync<ToDoActivity>(ToDoActivity.CreateRandomToDoActivity());
}

QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = 50,
EnableOptimisticDirectExecution = true
};

FeedIteratorInternal feedIterator =
(FeedIteratorInternal)container.GetItemQueryStreamIterator(
query,
null,
queryRequestOptions);

ResponseMessage responseMessage = await feedIterator.ReadNextAsync(CancellationToken.None);
string continuationToken = responseMessage.ContinuationToken;

QueryRequestOptions newQueryRequestOptions = new QueryRequestOptions
{
MaxItemCount = 50,
EnableOptimisticDirectExecution = false
};

// use Continuation Token to create new iterator and use same trace
FeedIterator feedIteratorNew =
container.GetItemQueryStreamIterator(
query,
continuationToken,
newQueryRequestOptions);

while (feedIteratorNew.HasMoreResults)
{
responseMessage = await feedIteratorNew.ReadNextAsync(CancellationToken.None);
}

string expectedErrorMessage = "The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. ";
Assert.IsTrue(responseMessage.CosmosException.ToString().Contains(expectedErrorMessage));
}

private class CustomHandler : RequestHandler
{
string correlatedActivityId;
Expand Down