Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.BiFunction;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update title with more user context: Like what scenario/state user sees.

Copy link
Member Author

@xinlian12 xinlian12 Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to ClassCastExceptionForTopQueryWithAggregate


public class AggregateDocumentQueryExecutionContext<T extends Resource> implements IDocumentQueryExecutionComponent<T>{

Expand Down Expand Up @@ -57,8 +57,8 @@ public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
.map( superList -> {

double requestCharge = 0;
List<Document> aggregateResults = new ArrayList<Document>();
HashMap<String, String> headers = new HashMap<String, String>();
List<Document> aggregateResults = new ArrayList<>();
HashMap<String, String> headers = new HashMap<>();

for(FeedResponse<T> page : superList) {

Expand Down Expand Up @@ -104,23 +104,22 @@ public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
Collection<AggregateOperator> aggregates,
Map<String, AggregateOperator> groupByAliasToAggregateType,
List<String> groupByAliases,
boolean hasSelectValue,
String continuationToken) {
String continuationToken,
PipelinedDocumentQueryParams<T> documentQueryParams) {

return createSourceComponentFunction
.apply(continuationToken)
.map(component -> {
return new AggregateDocumentQueryExecutionContext<T>(component,
new ArrayList<>(aggregates),
groupByAliasToAggregateType,
groupByAliases,
hasSelectValue,
continuationToken);
});
.apply(continuationToken, documentQueryParams)
.map(component -> new AggregateDocumentQueryExecutionContext<T>(component,
new ArrayList<>(aggregates),
groupByAliasToAggregateType,
groupByAliases,
hasSelectValue,
continuationToken));
}

public IDocumentQueryExecutionComponent<T> getComponent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.BiFunction;

public class DistinctDocumentQueryExecutionContext<T extends Resource> implements IDocumentQueryExecutionComponent<T> {
private final IDocumentQueryExecutionComponent<T> component;
Expand All @@ -43,9 +43,10 @@ private DistinctDocumentQueryExecutionContext(
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
DistinctQueryType distinctQueryType,
String continuationToken) {
String continuationToken,
PipelinedDocumentQueryParams<T> documentQueryParams) {

Utils.ValueHolder<DistinctContinuationToken> outDistinctcontinuationtoken = new Utils.ValueHolder<>();
DistinctContinuationToken distinctContinuationToken = new DistinctContinuationToken(null /*lasthash*/,
Expand All @@ -70,11 +71,9 @@ public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> cre

final UInt128 continuationTokenLastHash = distinctContinuationToken.getLastHash();

return createSourceComponentFunction.apply(distinctContinuationToken.getSourceToken()).map(component -> {
return new DistinctDocumentQueryExecutionContext<T>(component,
distinctQueryType,
continuationTokenLastHash);
});
return createSourceComponentFunction
.apply(distinctContinuationToken.getSourceToken(), documentQueryParams)
.map(component -> new DistinctDocumentQueryExecutionContext<T>(component, distinctQueryType, continuationTokenLastHash));
}

IDocumentQueryExecutionComponent<T> getComponent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,21 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
// }
}

return PipelinedDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
cosmosQueryRequestOptions,
resourceLink,
collectionRid,
partitionedQueryExecutionInfo,
targetRanges,
initialPageSize,
isContinuationExpected,
getLazyFeedResponse,
correlatedActivityId);
PipelinedDocumentQueryParams<T> documentQueryParams = new PipelinedDocumentQueryParams<T>(
resourceTypeEnum,
resourceType,
query,
resourceLink,
collectionRid,
getLazyFeedResponse,
isContinuationExpected,
initialPageSize,
targetRanges,
partitionedQueryExecutionInfo.getQueryInfo(),
cosmosQueryRequestOptions,
correlatedActivityId);

return PipelinedDocumentQueryExecutionContext.createAsync(client, documentQueryParams);
}

public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext<T>> createReadManyQueryAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.BiFunction;

public final class GroupByDocumentQueryExecutionContext<T extends Resource> implements
IDocumentQueryExecutionComponent<T> {
Expand All @@ -40,11 +40,12 @@ public final class GroupByDocumentQueryExecutionContext<T extends Resource> impl
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
String continuationToken,
Map<String, AggregateOperator> groupByAliasToAggregateType,
List<String> orderedAliases,
boolean hasSelectValue) {
boolean hasSelectValue,
PipelinedDocumentQueryParams<T> documentQueryParams) {
if (continuationToken != null) {
CosmosException dce = new BadRequestException(CONTINUATION_TOKEN_NOT_SUPPORTED_WITH_GROUP_BY);
return Flux.error(dce);
Expand All @@ -57,7 +58,7 @@ public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> cre
}
GroupingTable table = new GroupingTable(groupByAliasToAggregateType, orderedAliases, hasSelectValue);
// Have to pass non-null continuation token once supported
return createSourceComponentFunction.apply(null)
return createSourceComponentFunction.apply(null, documentQueryParams)
.map(component -> new GroupByDocumentQueryExecutionContext<>(component,
table));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,39 +77,32 @@ private OrderByDocumentQueryExecutionContext(

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Class<T> resourceType,
SqlQuerySpec expression,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
String collectionRid,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
List<PartitionKeyRange> partitionKeyRanges,
int initialPageSize,
boolean isContinuationExpected,
boolean getLazyFeedResponse,
UUID correlatedActivityId) {
PipelinedDocumentQueryParams<T> initParams) {

OrderByDocumentQueryExecutionContext<T> context = new OrderByDocumentQueryExecutionContext<T>(client,
partitionKeyRanges,
resourceTypeEnum,
resourceType,
expression,
cosmosQueryRequestOptions,
resourceLink,
partitionedQueryExecutionInfo.getQueryInfo().getRewrittenQuery(),
isContinuationExpected,
getLazyFeedResponse,
new OrderbyRowComparer<T>(partitionedQueryExecutionInfo.getQueryInfo().getOrderBy()),
collectionRid,
correlatedActivityId);
OrderByDocumentQueryExecutionContext<T> context = new OrderByDocumentQueryExecutionContext<T>(
client,
initParams.getPartitionKeyRanges(),
initParams.getResourceTypeEnum(),
initParams.getResourceType(),
initParams.getQuery(),
initParams.getCosmosQueryRequestOptions(),
initParams.getResourceLink(),
initParams.getQueryInfo().getRewrittenQuery(),
initParams.isContinuationExpected(),
initParams.isGetLazyResponseFeed(),
new OrderbyRowComparer<T>(initParams.getQueryInfo().getOrderBy()),
initParams.getCollectionRid(),
initParams.getCorrelatedActivityId());

context.setTop(initParams.getTop());

try {
context.initialize(partitionKeyRanges,
partitionedQueryExecutionInfo.getQueryInfo().getOrderBy(),
partitionedQueryExecutionInfo.getQueryInfo().getOrderByExpressions(),
initialPageSize,
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions));
context.initialize(
initParams.getPartitionKeyRanges(),
initParams.getQueryInfo().getOrderBy(),
initParams.getQueryInfo().getOrderByExpressions(),
initParams.getInitialPageSize(),
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(initParams.getCosmosQueryRequestOptions()));

return Flux.just(context);
} catch (CosmosException dce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,37 +60,29 @@ private ParallelDocumentQueryExecutionContext(

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Class<T> resourceType,
SqlQuerySpec query,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
String collectionRid,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
List<PartitionKeyRange> targetRanges,
int initialPageSize,
boolean isContinuationExpected,
boolean getLazyFeedResponse,
UUID correlatedActivityId) {

ParallelDocumentQueryExecutionContext<T> context = new ParallelDocumentQueryExecutionContext<T>(client,
targetRanges,
resourceTypeEnum,
resourceType,
query,
cosmosQueryRequestOptions,
resourceLink,
partitionedQueryExecutionInfo.getQueryInfo().getRewrittenQuery(),
collectionRid,
isContinuationExpected,
getLazyFeedResponse,
correlatedActivityId);
PipelinedDocumentQueryParams<T> initParams) {

ParallelDocumentQueryExecutionContext<T> context = new ParallelDocumentQueryExecutionContext<T>(
client,
initParams.getPartitionKeyRanges(),
initParams.getResourceTypeEnum(),
initParams.getResourceType(),
initParams.getQuery(),
initParams.getCosmosQueryRequestOptions(),
initParams.getResourceLink(),
initParams.getQueryInfo().getRewrittenQuery(),
initParams.getCollectionRid(),
initParams.isContinuationExpected(),
initParams.isGetLazyResponseFeed(),
initParams.getCorrelatedActivityId());
context.setTop(initParams.getTop());

try {
context.initialize(collectionRid,
targetRanges,
initialPageSize,
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions));
context.initialize(
initParams.getCollectionRid(),
initParams.getPartitionKeyRanges(),
initParams.getInitialPageSize(),
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(initParams.getCosmosQueryRequestOptions()));
return Flux.just(context);
} catch (CosmosException dce) {
return Flux.error(dce);
Expand Down
Loading