Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.CosmosItemProperties;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Offer;
import com.azure.cosmos.implementation.Paths;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.CosmosAsyncContainerResponse;
import com.azure.cosmos.models.CosmosAsyncItemResponse;
import com.azure.cosmos.models.CosmosConflictProperties;
Expand All @@ -23,9 +26,13 @@
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.helpers.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.stream.Collectors;

import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;
Expand Down Expand Up @@ -389,12 +396,29 @@ private <T> CosmosPagedFlux<T> queryItemsInternal(
}

private <T> FeedResponse<T> prepareFeedResponse(FeedResponse<Document> response, Class<T> classType) {
QueryInfo queryInfo = ModelBridgeInternal.getQueryInfoFromFeedResponse(response);
if (queryInfo != null && queryInfo.hasSelectValue()) {
List<T> transformedResults = response.getResults()
.stream()
.map(d -> d.get(Constants.Properties.VALUE))
.map(object -> transform(object, classType))
.collect(Collectors.toList());

return BridgeInternal.createFeedResponseWithQueryMetrics(transformedResults,
response.getResponseHeaders(),
ModelBridgeInternal.queryMetrics(response));

}
return BridgeInternal.createFeedResponseWithQueryMetrics(
(response.getResults().stream().map(document -> ModelBridgeInternal.toObjectFromJsonSerializable(document, classType))
(response.getResults().stream().map(document -> ModelBridgeInternal.toObjectFromJsonSerializable(document
, classType))
.collect(Collectors.toList())), response.getResponseHeaders(),
ModelBridgeInternal.queryMetrics(response));
}

private <T> T transform(Object object, Class<T> classType) {
return Utils.getSimpleObjectMapper().convertValue(object, classType);
}

/**
* Reads an item.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosKeyCredential;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.query.PipelinedDocumentQueryExecutionContext;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.FeedOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -36,8 +38,10 @@
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.models.AccessConditionType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -562,10 +566,24 @@ private <T extends Resource> Flux<FeedResponse<T>> createQuery(
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this);
Flux<? extends IDocumentQueryExecutionContext<T>> executionContext =
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, queryResourceLink, false, activityId);
return executionContext.flatMap(IDocumentQueryExecutionContext<T>::executeAsync);
return executionContext.flatMap(iDocumentQueryExecutionContext -> {
QueryInfo queryInfo = null;
if (iDocumentQueryExecutionContext instanceof PipelinedDocumentQueryExecutionContext) {
queryInfo = ((PipelinedDocumentQueryExecutionContext<T>) iDocumentQueryExecutionContext).getQueryInfo();
}
if (queryInfo != null && queryInfo.hasSelectValue()) {
QueryInfo finalQueryInfo = queryInfo;
return iDocumentQueryExecutionContext.executeAsync()
.map(tFeedResponse -> {
ModelBridgeInternal
.addQueryInfoToFeedResponse(tFeedResponse, finalQueryInfo);
return tFeedResponse;
});
}
return iDocumentQueryExecutionContext.executeAsync();
});
}


@Override
public Flux<FeedResponse<Database>> queryDatabases(String query, FeedOptions options) {
return queryDatabases(new SqlQuerySpec(query), options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class Utils {
Utils.simpleObjectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
Utils.simpleObjectMapper.configure(JsonParser.Feature.ALLOW_TRAILING_COMMA, true);
Utils.simpleObjectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true);
Utils.simpleObjectMapper.configure(DeserializationFeature.ACCEPT_FLOAT_AS_INT, false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes ObjectMapper universally, this is kind of change of behviour however as v4 is not GA yet not a breaking change. I think it should be fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, its a universal change. If any concerns, I can create a new ObjectMapper, but this should be Ok IMHO.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this applicable for forward write paths as well?

I.e. can the write paths result in un-expected conversion of data?


Utils.simpleObjectMapper.registerModule(new AfterburnerModule());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ public class PipelinedDocumentQueryExecutionContext<T extends Resource> implemen
private IDocumentQueryExecutionComponent<T> component;
private int actualPageSize;
private UUID correlatedActivityId;
private QueryInfo queryInfo;

private PipelinedDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int actualPageSize,
UUID correlatedActivityId) {
UUID correlatedActivityId, QueryInfo queryInfo) {
this.component = component;
this.actualPageSize = actualPageSize;
this.correlatedActivityId = correlatedActivityId;
this.queryInfo = queryInfo;

// this.executeNextSchedulingMetrics = new SchedulingStopwatch();
// this.executeNextSchedulingMetrics.Ready();
Expand Down Expand Up @@ -140,7 +142,7 @@ public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T

int pageSize = Math.min(actualPageSize, Utils.getValueOrDefault(queryInfo.getTop(), (actualPageSize)));
return createTakeComponentFunction.apply(feedOptions.getRequestContinuation())
.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId));
.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId, queryInfo));
}

public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T>> createReadManyAsync(
Expand All @@ -155,7 +157,7 @@ public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T

// TODO: Making pagesize -1. Should be reviewed
return documentQueryExecutionComponentFlux.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, -1,
activityId));
activityId, null));
}

@Override
Expand All @@ -165,4 +167,8 @@ public Flux<FeedResponse<T>> executeAsync() {
// TODO add more code here
return this.component.drainAsync(actualPageSize);
}

public QueryInfo getQueryInfo() {
return this.queryInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

package com.azure.cosmos.implementation.query.orderbyquery;

import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.query.QueryItem;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.List;

Expand All @@ -20,8 +23,8 @@ public final class OrderByRowResult<T> extends Document {
private final String backendContinuationToken;

public OrderByRowResult(
Class<T> klass,
String jsonString,
Class<T> klass,
String jsonString,
PartitionKeyRange targetRange,
String backendContinuationToken) {
super(jsonString);
Expand All @@ -35,8 +38,20 @@ public List<QueryItem> getOrderByItems() {
: (this.orderByItems = super.getList("orderByItems", QueryItem.class));
}

@SuppressWarnings("unchecked")
public T getPayload() {
return this.payload != null ? this.payload : (this.payload = super.getObject("payload", klass));
if (this.payload != null) {
return this.payload;
}
final Object object = super.get("payload");
if (klass == Document.class && !ObjectNode.class.isAssignableFrom(object.getClass())) {
Comment thread
moderakh marked this conversation as resolved.
Document document = new Document();
ModelBridgeInternal.setProperty(document, Constants.Properties.VALUE, object);
payload = (T) document;
} else {
this.payload = super.getObject("payload", klass);
}
return payload;
}

public PartitionKeyRange getSourcePartitionKeyRange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.QueryMetricsConstants;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.query.QueryInfo;

import java.util.HashMap;
import java.util.List;
Expand All @@ -35,6 +36,7 @@ public class FeedResponse<T> implements ContinuablePage<String, T> {
private final ConcurrentMap<String, QueryMetrics> queryMetricsMap;
private final static String defaultPartition = "0";
private final FeedResponseDiagnostics feedResponseDiagnostics;
private QueryInfo queryInfo;

FeedResponse(List<T> results, Map<String, String> headers) {
this(results, headers, false, false, new ConcurrentHashMap<>());
Expand Down Expand Up @@ -400,4 +402,13 @@ private static String getValueOrNull(Map<String, String> map, String key) {
}
return null;
}

void setQueryInfo(QueryInfo queryInfo) {
this.queryInfo = queryInfo;
}

QueryInfo getQueryInfo() {
return this.queryInfo;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -615,4 +615,12 @@ public static Offer getOfferFromThroughputProperties(ThroughputProperties proper
public static ThroughputResponse createThroughputRespose(ResourceResponse<Offer> offerResourceResponse) {
return new ThroughputResponse(offerResourceResponse);
}

public static void addQueryInfoToFeedResponse(FeedResponse<?> feedResponse, QueryInfo queryInfo){
feedResponse.setQueryInfo(queryInfo);
}

public static QueryInfo getQueryInfoFromFeedResponse(FeedResponse<?> response) {
return response.getQueryInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.cosmos.models.CosmosUserProperties;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;

import java.time.Duration;
Expand Down Expand Up @@ -74,6 +75,21 @@ public void validate(List<FeedResponse<T>> feedList) {
return this;
}

public Builder<T> containsExactlyValues(List<T> expectedValues) {
validators.add(new FeedResponseListValidator<T>() {
@Override
public void validate(List<FeedResponse<T>> feedList) {
List<T> actualValues = feedList.stream()
.flatMap(f -> f.getResults().stream())
.collect(Collectors.toList());
assertThat(actualValues)
.describedAs("Result values")
.containsExactlyElementsOf(expectedValues);
}
});
return this;
}

public Builder<T> containsExactlyIds(List<String> expectedIds) {
validators.add(new FeedResponseListValidator<T>() {
@Override
Expand Down Expand Up @@ -188,31 +204,31 @@ public void validate(List<FeedResponse<T>> feedList) {
}

public Builder<T> withAggregateValue(Object value) {
validators.add(new FeedResponseListValidator<CosmosItemProperties>() {
validators.add(new FeedResponseListValidator<JsonNode>() {
@Override
public void validate(List<FeedResponse<CosmosItemProperties>> feedList) {
List<CosmosItemProperties> list = feedList.get(0).getResults();
CosmosItemProperties result = list.size() > 0 ? list.get(0) : null;
public void validate(List<FeedResponse<JsonNode>> feedList) {
List<JsonNode> list = feedList.get(0).getResults();
JsonNode result = list.size() > 0 ? list.get(0) : null;

if (result != null) {
if (value instanceof Double) {

Double d = ModelBridgeInternal.getDoubleFromJsonSerializable(result, Constants.Properties.VALUE);
Double d = result.asDouble();
assertThat(d).isEqualTo(value);
} else if (value instanceof Integer) {

Integer d = ModelBridgeInternal.getIntFromJsonSerializable(result, Constants.Properties.VALUE);
Integer d = result.asInt();
assertThat(d).isEqualTo(value);
} else if (value instanceof String) {

String d = ModelBridgeInternal.getStringFromJsonSerializable(result, Constants.Properties.VALUE);
String d = result.asText();
assertThat(d).isEqualTo(value);
} else if (value instanceof Document){
} else if (value instanceof Document) {

assertThat(result.toString()).isEqualTo(value.toString());
} else {

assertThat(ModelBridgeInternal.getObjectFromJsonSerializable(result, Constants.Properties.VALUE)).isNull();
assertThat(result.isNull()).isTrue();
assertThat(value).isNull();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.implementation.CosmosItemProperties;
import com.azure.cosmos.models.FeedOptions;
import com.azure.cosmos.implementation.FeedResponseListValidator;
import com.fasterxml.jackson.databind.JsonNode;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
Expand Down Expand Up @@ -71,15 +72,16 @@ public AggregateQueryTests(CosmosClientBuilder clientBuilder) {
public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception {

FeedOptions options = new FeedOptions();

options.setPopulateQueryMetrics(qmEnabled);
options.setMaxDegreeOfParallelism(2);

for (QueryConfig queryConfig : queryConfigs) {

CosmosPagedFlux<CosmosItemProperties> queryObservable = createdCollection.queryItems(queryConfig.query, options, CosmosItemProperties.class);
CosmosPagedFlux<JsonNode> queryObservable = createdCollection.queryItems(queryConfig.query, options,
JsonNode.class);

FeedResponseListValidator<CosmosItemProperties> validator = new FeedResponseListValidator.Builder<CosmosItemProperties>()
FeedResponseListValidator<JsonNode> validator = new FeedResponseListValidator.Builder<JsonNode>()
.withAggregateValue(queryConfig.expected)
.numberOfPages(1)
.hasValidQueryMetrics(qmEnabled)
Expand Down
Loading