Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Documents;
using System.Text;

/// <summary>
/// Provides a client-side logical representation of the Azure Cosmos DB database account.
Expand Down Expand Up @@ -75,6 +76,10 @@ static CosmosClient()
{
HttpConstants.Versions.CurrentVersion = HttpConstants.Versions.v2018_12_31;
HttpConstants.Versions.CurrentVersionUTF8 = Encoding.UTF8.GetBytes(HttpConstants.Versions.CurrentVersion);

// V3 always assumes assemblies exists
// Shall revisit on feedback
ServiceInteropWrapper.AssembliesExist = new Lazy<bool>(() => true);
}

/// <summary>
Expand Down Expand Up @@ -202,7 +207,7 @@ internal CosmosClient(

internal CosmosOffers Offers => this.offerSet.Value;
internal DocumentClient DocumentClient { get; set; }
internal CosmosRequestHandler RequestHandler { get; private set; }
internal RequestInvokerHandler RequestHandler { get; private set; }
internal ConsistencyLevel AccountConsistencyLevel { get; private set; }

internal CosmosResponseFactory ResponseFactory =>
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private set

internal CosmosRequestHandler PartitionKeyRangeHandler { get; set; }

public CosmosRequestHandler Build()
public RequestInvokerHandler Build()
{
CosmosRequestHandler root = new RequestInvokerHandler(this.client);
RequestInvokerHandler root = new RequestInvokerHandler(this.client);

CosmosRequestHandler current = root;
if (this.CustomHandlers != null && this.CustomHandlers.Any())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ internal async Task AssertPartitioningDetailsAsync(CosmosClient client, Cancella
{
Debug.Assert(this.AssertPartitioningPropertiesAndHeaders());
}
#endif
#if !DEBUG
#else
await Task.CompletedTask;
#endif
}
Expand Down
128 changes: 126 additions & 2 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

namespace Microsoft.Azure.Cosmos.Handlers
{
using Microsoft.Azure.Cosmos.Internal;
using Microsoft.Azure.Documents;
using System;
using System.Globalization;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;

/// <summary>
/// HttpMessageHandler can only be invoked by derived classed or internal classes inside http assembly
Expand Down Expand Up @@ -82,6 +85,127 @@ public override Task<CosmosResponseMessage> SendAsync(
.Unwrap();
}

public virtual async Task<T> SendAsync<T>(
Uri resourceUri,
ResourceType resourceType,
OperationType operationType,
CosmosRequestOptions requestOptions,
CosmosContainerCore cosmosContainerCore,
Object partitionKey,
Stream streamPayload,
Action<CosmosRequestMessage> requestEnricher,
Func<CosmosResponseMessage, T> responseCreator,
CancellationToken cancellation = default(CancellationToken))
{
if (responseCreator == null)
{
throw new ArgumentNullException(nameof(responseCreator));
}

CosmosResponseMessage responseMessage = await this.SendAsync(
resourceUri: resourceUri,
resourceType: resourceType,
operationType: operationType,
requestOptions: requestOptions,
cosmosContainerCore: cosmosContainerCore,
partitionKey: partitionKey,
streamPayload: streamPayload,
requestEnricher: requestEnricher,
cancellation: cancellation);

return responseCreator(responseMessage);
}

public virtual async Task<CosmosResponseMessage> SendAsync(
Uri resourceUri,
ResourceType resourceType,
OperationType operationType,
CosmosRequestOptions requestOptions,
CosmosContainerCore cosmosContainerCore,
Object partitionKey,
Stream streamPayload,
Action<CosmosRequestMessage> requestEnricher,
CancellationToken cancellation = default(CancellationToken))
{
if (resourceUri == null)
{
throw new ArgumentNullException(nameof(resourceUri));
}

HttpMethod method = RequestInvokerHandler.GetHttpMethod(operationType);

CosmosRequestMessage request = new CosmosRequestMessage(method, resourceUri);
request.OperationType = operationType;
request.ResourceType = resourceType;
request.RequestOptions = requestOptions;
request.Content = streamPayload;

if (partitionKey != null)
{
if (cosmosContainerCore == null && Object.ReferenceEquals(partitionKey, CosmosContainerSettings.NonePartitionKeyValue))
{
throw new ArgumentException($"{nameof(cosmosContainerCore)} can not be null with partition key as PartitionKey.None");
}
else if (Object.ReferenceEquals(partitionKey, CosmosContainerSettings.NonePartitionKeyValue))
{
try
{
PartitionKeyInternal partitionKeyInternal = await cosmosContainerCore.GetNonePartitionKeyValueAsync(cancellation);
request.Headers.PartitionKey = partitionKeyInternal.ToJsonString();
}
catch (DocumentClientException dce)
{
return dce.ToCosmosResponseMessage(request);
}
}
else
{
PartitionKey pk = new PartitionKey(partitionKey);
request.Headers.PartitionKey = pk.InternalKey.ToJsonString();
}
}

if (operationType == OperationType.Upsert)
{
request.Headers.IsUpsert = bool.TrueString;
}

requestEnricher?.Invoke(request);
return await this.SendAsync(request, cancellation);
}

internal static HttpMethod GetHttpMethod(
OperationType operationType)
{
HttpMethod httpMethod = HttpMethod.Head;
if (operationType == OperationType.Create ||
operationType == OperationType.Upsert ||
operationType == OperationType.Query ||
operationType == OperationType.SqlQuery ||
operationType == OperationType.Batch ||
operationType == OperationType.ExecuteJavaScript)
{
return HttpMethod.Post;
}
else if (operationType == OperationType.Read ||
operationType == OperationType.ReadFeed)
{
return HttpMethod.Get;
}
else if (operationType == OperationType.Replace)
{
return HttpMethod.Put;
}
else if (operationType == OperationType.Delete)
{
return HttpMethod.Delete;
}
else
{
throw new NotImplementedException();
}
}

private void FillMultiMasterContext(CosmosRequestMessage request)
{
if (this.client.DocumentClient.UseMultipleWriteLocations)
Expand Down
3 changes: 3 additions & 0 deletions Microsoft.Azure.Cosmos/src/Json/JsonBinaryEncoding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ internal static class JsonBinaryEncoding
"properties",
"type",
"value",
"Feature",
"FeatureCollection",
"_id",
};

/// <summary>
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.Azure.Cosmos/src/Microsoft.Azure.Cosmos.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
<Description>This client library enables client applications to connect to Azure Cosmos via the SQL API. Azure Cosmos is a globally distributed, multi-model database service. For more information, refer to http://azure.microsoft.com/services/cosmos-db/. </Description>
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<NeutralLanguage>en-US</NeutralLanguage>
<ClientVersion>3.0.0.10-preview</ClientVersion>
<DirectVersion>3.0.0.26-preview</DirectVersion>
<ClientVersion>3.0.0.11-preview</ClientVersion>
<DirectVersion>3.0.0.28-preview</DirectVersion>
<Version Condition=" '$(IsNightly)' == '1' ">$(ClientVersion)-nightly$(CurrentDate)</Version>
<Version Condition=" '$(IsNightly)' == '0' Or '$(IsNightly)' == '' ">$(ClientVersion)</Version>
<FileVersion>$(VersionPrefix)</FileVersion>
Expand Down Expand Up @@ -42,11 +42,11 @@
</ItemGroup>

<ItemGroup Condition=" '$(SignAssembly)' == 'true' ">
<PackageReference Include="Microsoft.Azure.Cosmos.Direct" Version="$(DirectVersion)" />
<PackageReference Include="Microsoft.Azure.Cosmos.Direct" Version="[$(DirectVersion)]" />
</ItemGroup>

<ItemGroup Condition=" '$(SignAssembly)' != 'true' ">
<PackageReference Include="Microsoft.Azure.Cosmos.Direct.MyGet" Version="$(DirectVersion)" />
<PackageReference Include="Microsoft.Azure.Cosmos.Direct.MyGet" Version="[$(DirectVersion)]" />
</ItemGroup>

<ItemGroup>
Expand All @@ -61,5 +61,5 @@
<PropertyGroup>
<DefineConstants>$(DefineConstants);DOCDBCLIENT;NETSTANDARD20</DefineConstants>
<DefineConstants Condition=" '$(SignAssembly)' == 'true' ">$(DefineConstants);SignAssembly</DefineConstants>
</PropertyGroup>
</PropertyGroup>
</Project>
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/Query/CosmosQueryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal class CosmosQueryContext
public ResourceType ResourceTypeEnum { get; }
public OperationType OperationTypeEnum { get; }
public Type ResourceType { get; }
public SqlQuerySpec SqlQuerySpec { get; }
public SqlQuerySpec SqlQuerySpec { get; internal set; }
public CosmosQueryRequestOptions QueryRequestOptions { get; }
public bool IsContinuationExpected { get; }
public bool AllowNonValueAggregateQuery { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace Microsoft.Azure.Cosmos.Query
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.ParallelQuery;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;

/// <summary>
/// Factory class for creating the appropriate DocumentQueryExecutionContext for the provided type of query.
/// </summary>
internal class CosmosQueryExecutionContextFactory : IDocumentQueryExecutionContext
{
internal const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private IDocumentQueryExecutionContext innerExecutionContext;
private CosmosQueryContext cosmosQueryContext;

Expand Down Expand Up @@ -110,11 +112,6 @@ private async Task<IDocumentQueryExecutionContext> CreateItemQueryExecutionConte
{
collection = await collectionCache.ResolveCollectionAsync(request, cancellationToken);
}

if (this.cosmosQueryContext.QueryRequestOptions != null && this.cosmosQueryContext.QueryRequestOptions.PartitionKey != null && this.cosmosQueryContext.QueryRequestOptions.PartitionKey.Equals(PartitionKey.None))
{
this.cosmosQueryContext.QueryRequestOptions.PartitionKey = PartitionKey.FromInternalKey(collection.GetNoneValue());
}
}

if(collection == null)
Expand Down Expand Up @@ -142,10 +139,31 @@ private async Task<IDocumentQueryExecutionContext> CreateItemQueryExecutionConte
//todo:elasticcollections this may rely on information from collection cache which is outdated
//if collection is deleted/created with same name.
//need to make it not rely on information from collection cache.
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync(
queryClient: this.cosmosQueryContext.QueryClient,
PartitionKeyDefinition partitionKeyDefinition;
object partitionKeyDefinitionObject;
if (this.cosmosQueryContext.QueryRequestOptions?.Properties != null
&& this.cosmosQueryContext.QueryRequestOptions.Properties.TryGetValue(InternalPartitionKeyDefinitionProperty, out partitionKeyDefinitionObject))
{
if (partitionKeyDefinitionObject is PartitionKeyDefinition definition)
{
partitionKeyDefinition = definition;
}
else
{
throw new ArgumentException(
"partitionkeydefinition has invalid type",
nameof(partitionKeyDefinitionObject));
}
}
else
{
partitionKeyDefinition = collection.PartitionKey;
}

// $ISSUE-felixfan-2016-07-13: We should probably get PartitionedQueryExecutionInfo from Gateway in GatewayMode
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = await this.cosmosQueryContext.QueryClient.GetPartitionedQueryExecutionInfoAsync(
sqlQuerySpec: this.cosmosQueryContext.SqlQuerySpec,
partitionKeyDefinition: collection.PartitionKey,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: true,
isContinuationExpected: true,
allowNonValueAggregateQuery: this.cosmosQueryContext.AllowNonValueAggregateQuery,
Expand Down Expand Up @@ -173,6 +191,14 @@ public static async Task<IDocumentQueryExecutionContext> CreateSpecializedDocume
string collectionRid,
CancellationToken cancellationToken)
{
if (!string.IsNullOrEmpty(partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery))
if (!string.IsNullOrEmpty(partitionedQueryExecutionInfo.QueryInfo?.RewrittenQuery))
{
cosmosQueryContext.SqlQuerySpec = new SqlQuerySpec(
partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery,
cosmosQueryContext.SqlQuerySpec.Parameters);
}

// Figure out the optimal page size.
long initialPageSize = cosmosQueryContext.QueryRequestOptions.MaxItemCount.GetValueOrDefault(ParallelQueryConfig.GetConfig().ClientInternalPageSize);

Expand Down Expand Up @@ -256,10 +282,21 @@ internal static async Task<List<PartitionKeyRange>> GetTargetPartitionKeyRanges(
List<PartitionKeyRange> targetRanges;
if (queryRequestOptions.PartitionKey != null)
{
// Dis-ambiguate the NonePK if used
PartitionKeyInternal partitionKeyInternal = null;
if (Object.ReferenceEquals(queryRequestOptions.PartitionKey, CosmosContainerSettings.NonePartitionKeyValue))
{
partitionKeyInternal = collection.GetNoneValue();
}
else
{
partitionKeyInternal = new PartitionKey(queryRequestOptions.PartitionKey).InternalKey;
}

targetRanges = await queryClient.GetTargetPartitionKeyRangesByEpkString(
resourceLink,
collection.ResourceId,
new PartitionKey(queryRequestOptions.PartitionKey).InternalKey.GetEffectivePartitionKeyString(collection.PartitionKey));
partitionKeyInternal.GetEffectivePartitionKeyString(collection.PartitionKey));
}
else if (TryGetEpkProperty(queryRequestOptions, out string effectivePartitionKeyString))
{
Expand All @@ -279,7 +316,7 @@ internal static async Task<List<PartitionKeyRange>> GetTargetPartitionKeyRanges(
return targetRanges;
}

public static async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInfoAsync(
public static Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInfoAsync(
CosmosQueryClient queryClient,
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
Expand All @@ -290,12 +327,13 @@ public static async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecu
{
// $ISSUE-felixfan-2016-07-13: We should probably get PartitionedQueryExecutionInfo from Gateway in GatewayMode

QueryPartitionProvider queryPartitionProvider = await queryClient.GetQueryPartitionProviderAsync(cancellationToken);
return queryPartitionProvider.GetPartitionedQueryExecutionInfo(sqlQuerySpec,
partitionKeyDefinition,
requireFormattableOrderByQuery,
isContinuationExpected,
allowNonValueAggregateQuery);
return queryClient.GetPartitionedQueryExecutionInfoAsync(
sqlQuerySpec,
partitionKeyDefinition,
requireFormattableOrderByQuery,
isContinuationExpected,
allowNonValueAggregateQuery,
cancellationToken);
}


Expand Down
Loading