Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ For example, using maven, you can add the following dependency to your maven pom
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>

<artifactId>azure-cosmosdb-benchmark</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 2.6.2
- Fixed query failure when setting MaxItemCount to -1 ([#261](https://github.com/Azure/azure-cosmosdb-java/issues/261)).
- Fixed a NPE bug on Partitoin split ([#267](https://github.com/Azure/azure-cosmosdb-java/pull/267).
- Improved logging in Direct mode.

## 2.6.1

- Multimaster regional failover fixes for query logic ([#245](https://github.com/Azure/azure-cosmosdb-java/commit/104b4a3b30ffd7c8add01096ce6a9b6e7a3f6318))
Expand Down
2 changes: 1 addition & 1 deletion commons-test-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SOFTWARE.
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-commons-test-utils</artifactId>
<name>Common Test Components for Testing Async SDK for SQL API of Azure Cosmos DB Service</name>
Expand Down
2 changes: 1 addition & 1 deletion commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ SOFTWARE.
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>
<artifactId>azure-cosmosdb-commons</artifactId>
<name>Common Components for Async SDK for SQL API of Azure Cosmos DB Service</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public static class Versions {
// TODO: FIXME we can use maven plugin for generating a version file
// @see
// https://stackoverflow.com/questions/2469922/generate-a-version-java-file-in-maven
public static final String SDK_VERSION = "2.6.1";
public static final String SDK_VERSION = "2.6.2";
public static final String SDK_NAME = "cosmosdb-java-sdk";
public static final String QUERY_VERSION = "1.0";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,36 @@

package com.microsoft.azure.cosmosdb.internal;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class JavaStreamUtils {

private static <T> String safeToString(T t) {
return t != null ? t.toString() : "null";
}

public static <T> String info(Collection<T> collection) {
return collection == null ? "null collection" :
"collection size: " + collection.size();
}

public static <T> String info(T[] collection) {
return collection == null ? "null collection" :
"collection size: " + collection.length;
}

public static <T> String toString(Collection<T> collection, String delimiter) {
return collection.stream()
.map( t -> safeToString(t) )
.collect(Collectors.joining(delimiter));
return collection == null ? "null collection" :
collection.isEmpty() ? "empty collection" :
collection.stream()
.map(t -> safeToString(t))
.collect(Collectors.joining(delimiter));
}

public static <T> String toString(T[] array, String delimiter) {
return array == null ? "null array" :
toString(Arrays.asList(array), delimiter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public String toHeader() {
return String.format("%s", this.partitionKeyRangeId);
}

@Override
public String toString() {
return "PartitionKeyRangeIdentity{" +
"collectionRid='" + collectionRid + '\'' +
", partitionKeyRangeId='" + partitionKeyRangeId + '\'' +
'}';
}

@Override
public boolean equals(Object other) {
if (null == other) {
Expand Down
2 changes: 1 addition & 1 deletion direct-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SOFTWARE.
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-direct</artifactId>
<name>Azure Cosmos DB Async SDK Direct Internal Implementation</name>
<version>2.6.1</version>
<version>2.6.2</version>
<description>Azure Cosmos DB Async SDK Direct Internal Implementation</description>
<url>https://docs.microsoft.com/en-us/azure/cosmos-db</url>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ private Single<ResolutionResult> tryResolveServerPartitionAsync(
if (range == null) {
// Collection cache or routing map cache is potentially outdated. Return null -
// upper logic will refresh cache and retry.
return null;
logger.debug("Collection cache or routing map cache is potentially outdated." +
" Returning null. Upper logic will refresh cache and retry.");
return Single.just(null);
}

Single<AddressInformation[]> addressesObs = this.addressCache.tryGetAddresses(
Expand Down Expand Up @@ -295,6 +297,7 @@ private PartitionKeyRange tryResolveSinglePartitionCollection(
return (PartitionKeyRange) routingMap.getOrderedPartitionKeyRanges().get(0);
}

logger.debug("tryResolveSinglePartitionCollection: collectionCacheIsUptoDate = {}", collectionCacheIsUptoDate);
if (collectionCacheIsUptoDate) {
throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.getResourceAddress());
} else {
Expand Down Expand Up @@ -573,6 +576,7 @@ private ResolutionResult handleRangeAddressResolutionFailure(
throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.getResourceAddress());
}

logger.debug("handleRangeAddressResolutionFailure returns null");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.internal.Constants;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.internal.JavaStreamUtils;
import com.microsoft.azure.cosmosdb.internal.OperationType;
import com.microsoft.azure.cosmosdb.internal.Paths;
import com.microsoft.azure.cosmosdb.internal.PathsHelper;
Expand Down Expand Up @@ -161,6 +162,10 @@ public Single<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest req
com.microsoft.azure.cosmosdb.rx.internal.Utils.checkNotNullOrThrow(request, "request", "");
com.microsoft.azure.cosmosdb.rx.internal.Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", "");

logger.debug("PartitionKeyRangeIdentity {}, forceRefreshPartitionAddresses {}",
partitionKeyRangeIdentity,
forceRefreshPartitionAddresses);

if (StringUtils.equals(partitionKeyRangeIdentity.getPartitionKeyRangeId(),
PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {

Expand All @@ -171,6 +176,8 @@ public Single<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest req
Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity);

if (suboptimalServerPartitionTimestamp != null) {
logger.debug("suboptimalServerPartitionTimestamp is {}", suboptimalServerPartitionTimestamp);

boolean forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds()
> this.suboptimalPartitionForceRefreshIntervalInSeconds;

Expand All @@ -179,14 +186,20 @@ public Single<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest req
// and if they are equal, updates the key with a third value.
Instant newValue = this.suboptimalServerPartitionTimestamps.computeIfPresent(partitionKeyRangeIdentity,
(key, oldVal) -> {
logger.debug("key = {}, oldValue = {}", key, oldVal);

if (suboptimalServerPartitionTimestamp.equals(oldVal)) {
return Instant.MAX;
} else {
return oldVal;
}
});


logger.debug("newValue is {}", newValue);
if (!newValue.equals(suboptimalServerPartitionTimestamp)) {
logger.debug("setting forceRefreshPartitionAddresses to true");

// the value was replaced;
forceRefreshPartitionAddresses = true;
}
Expand All @@ -196,6 +209,8 @@ public Single<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest req
final boolean forceRefreshPartitionAddressesModified = forceRefreshPartitionAddresses;

if (forceRefreshPartitionAddressesModified) {
logger.debug("refresh serverPartitionAddressCache for {}", partitionKeyRangeIdentity);

this.serverPartitionAddressCache.refresh(
partitionKeyRangeIdentity,
() -> this.getAddressesForRangeId(
Expand All @@ -219,29 +234,38 @@ public Single<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest req
return addressesObs.map(
addresses -> {
if (notAllReplicasAvailable(addresses)) {
if (logger.isDebugEnabled()) {
logger.debug("not all replicas available {}", JavaStreamUtils.info(addresses));
}

this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now());
}

return addresses;
}).onErrorResumeNext(ex -> {
DocumentClientException dce = com.microsoft.azure.cosmosdb.rx.internal.Utils.as(ex, DocumentClientException.class);
if (dce == null) {
logger.error("unexpected failure", ex);

if (forceRefreshPartitionAddressesModified) {
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
}
return Single.error(ex);
} else {
assert dce != null;
logger.debug("tryGetAddresses dce", dce);
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) ||
Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) ||
Exceptions.isSubStatusCode(dce, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) {
//remove from suboptimal cache in case the collection+pKeyRangeId combo is gone.
// remove from suboptimal cache in case the collection+pKeyRangeId combo is gone.
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
return null;

logger.debug("tryGetAddresses: inner onErrorResumeNext return null", dce);
return Single.just(null);
}

return Single.error(ex);
}

});
}

Expand All @@ -250,6 +274,10 @@ Single<List<Address>> getServerAddressesViaGatewayAsync(
String collectionRid,
List<String> partitionKeyRangeIds,
boolean forceRefresh) {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", collectionRid,
JavaStreamUtils.toString(partitionKeyRangeIds, ","));
}
String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
HashMap<String, String> addressQuery = new HashMap<>();

Expand Down Expand Up @@ -303,6 +331,9 @@ Single<List<Address>> getServerAddressesViaGatewayAsync(
HttpClientUtils.parseResponseAsync(rsp));
return dsrObs.map(
dsr -> {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync deserializes result");
}
logAddressResolutionEnd(request, identifier);
List<Address> addresses = dsr.getQueryResponse(Address.class);
return addresses;
Expand All @@ -315,6 +346,7 @@ public void dispose() {
}

private Single<Pair<PartitionKeyRangeIdentity, AddressInformation[]>> resolveMasterAsync(RxDocumentServiceRequest request, boolean forceRefresh, Map<String, Object> properties) {
logger.debug("resolveMasterAsync forceRefresh: {}", forceRefresh);
Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeInitial = this.masterPartitionAddressCache;

forceRefresh = forceRefresh ||
Expand Down Expand Up @@ -366,18 +398,27 @@ private Single<AddressInformation[]> getAddressesForRangeId(
String collectionRid,
String partitionKeyRangeId,
boolean forceRefresh) {
logger.debug("getAddressesForRangeId collectionRid {}, partitionKeyRangeId {}, forceRefresh {}",
collectionRid, partitionKeyRangeId, forceRefresh);

Single<List<Address>> addressResponse = this.getServerAddressesViaGatewayAsync(request, collectionRid, Collections.singletonList(partitionKeyRangeId), forceRefresh);

Single<List<Pair<PartitionKeyRangeIdentity, AddressInformation[]>>> addressInfos =
addressResponse.map(
addresses ->
addresses.stream().filter(addressInfo ->
addresses -> {
if (logger.isDebugEnabled()) {
logger.debug("addresses from getServerAddressesViaGatewayAsync in getAddressesForRangeId {}",
JavaStreamUtils.info(addresses));
}

return addresses.stream().filter(addressInfo ->
this.protocolScheme.equals(addressInfo.getProtocolScheme()))
.collect(Collectors.groupingBy(
address -> address.getParitionKeyRangeId()))
.values().stream()
.map(groupedAddresses -> toPartitionAddressAndRange(collectionRid, addresses))
.collect(Collectors.toList()));
.collect(Collectors.toList());
});

Single<List<Pair<PartitionKeyRangeIdentity, AddressInformation[]>>> result = addressInfos.map(addressInfo -> addressInfo.stream()
.filter(a ->
Expand All @@ -386,8 +427,11 @@ private Single<AddressInformation[]> getAddressesForRangeId(

return result.flatMap(
list -> {
if (list.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("getAddressesForRangeId flatMap got result {}", JavaStreamUtils.info(list));
}

if (list.isEmpty()) {
String errorMessage = String.format(
RMResources.PartitionKeyRangeNotFound,
partitionKeyRangeId,
Expand All @@ -400,7 +444,9 @@ private Single<AddressInformation[]> getAddressesForRangeId(
} else {
return Single.just(list.get(0).getRight());
}
});
}).doOnError(e -> {
logger.debug("getAddressesForRangeId", e);
});
}

Single<List<Address>> getMasterAddressesViaGatewayAsync(
Expand All @@ -411,6 +457,20 @@ Single<List<Address>> getMasterAddressesViaGatewayAsync(
boolean forceRefresh,
boolean useMasterCollectionResolver,
Map<String, Object> properties) {

logger.debug("getMasterAddressesViaGatewayAsync " +
"resourceType {}, " +
"resourceAddress {}, " +
"entryUrl {}, " +
"forceRefresh {}, " +
"useMasterCollectionResolver {}",
resourceType,
resourceAddress,
entryUrl,
forceRefresh,
useMasterCollectionResolver
);

HashMap<String, String> queryParameters = new HashMap<>();
queryParameters.put(HttpConstants.QueryStrings.URL, HttpUtils.urlEncode(entryUrl));
HashMap<String, String> headers = new HashMap<>(defaultRequestHeaders);
Expand Down Expand Up @@ -456,6 +516,7 @@ Single<List<Address>> getMasterAddressesViaGatewayAsync(
}

private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddressAndRange(String collectionRid, List<Address> addresses) {
logger.debug("toPartitionAddressAndRange");
Address address = addresses.get(0);

AddressInformation[] addressInfos =
Expand All @@ -472,6 +533,10 @@ private static AddressInformation toAddressInformation(Address address) {
public Completable openAsync(
DocumentCollection collection,
List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {
if (logger.isDebugEnabled()) {
logger.debug("openAsync {}", collection, JavaStreamUtils.toString(partitionKeyRangeIdentities, ","));
}

List<Observable<List<Address>>> tasks = new ArrayList<>();
int batchSize = GatewayAddressCache.DefaultBatchSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Single<ShouldRetryResult> shouldRetry(Exception exception) {
(this.request.getPartitionKeyRangeIdentity() == null ||
this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) &&
!(exception instanceof PartitionKeyRangeIsSplittingException)) {
logger.debug("Operation will NOT be retried. Current attempt {}, Exception: {} ", this.attemptCount,
logger.debug("Operation will NOT be retried. Current attempt {}, Exception:", this.attemptCount,
exception);
stopStopWatch(this.durationTimer);
return Single.just(ShouldRetryResult.noRetry());
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-parent</artifactId>
<version>2.6.1</version>
<version>2.6.2</version>
</parent>

<artifactId>azure-cosmosdb-examples</artifactId>
Expand Down
Loading