Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0bb32e1
Corrected package misspelling in log4j.properties, removed System.exi…
Nov 9, 2019
921d5cc
Merge branch 'feature/cosmos/v4' of github.com:David-Noble-at-work/az…
Nov 9, 2019
2028dec
Merge branch 'feature/cosmos/v4' of https://github.com/Azure/azure-sd…
Nov 9, 2019
30f5fd3
Fixed a merge error and resolved #5043, Variables that specify time i…
Nov 9, 2019
7a9b947
Minor code cleanup in RntbdTransportClient
Nov 9, 2019
89bed8f
Fixed a second merge issue
Nov 9, 2019
6430a70
Re-enabled all tests
Nov 12, 2019
a1d0118
Attempt to fix a test break
Nov 12, 2019
077d88a
Fixed a javadoc issue and bumped the value of RntbdRequestTimer.TIMER…
Nov 12, 2019
98981fd
Fixed a javadoc issues
Nov 12, 2019
e9d07b0
Fixed a javadoc issue
Nov 12, 2019
7bcb84c
Reenabled direct tcp backpressure tests
Nov 13, 2019
e16b79b
Fixed a javadoc issue
Nov 13, 2019
849cbb4
Updated collection cache layer logic with v2. Removed Flux.empty() an…
kushagraThapar Nov 13, 2019
e02739d
Fixed query test issues
kushagraThapar Nov 13, 2019
1306222
Fixed emulator breaking change
kushagraThapar Nov 13, 2019
30b5d05
Attempt to fix CosmosPartitionKeyTests::testNonPartitionedCollectionO…
Nov 13, 2019
2c0455a
Fixed DocumentProducerTest
kushagraThapar Nov 13, 2019
2d5c525
Merge branch 'null_value_holder_change' of https://github.com/kushagr…
Nov 13, 2019
3564497
Tweaked code comments
Nov 14, 2019
8002aa6
Renamed beforeMethods for easier interpretation of the TestNG logs. W…
Nov 14, 2019
c40b522
Renamed beforeClass methods for easier interpretation of the TestNG l…
Nov 14, 2019
379486b
Workaround issue #6346 to identify test failures that might lie behin…
Nov 14, 2019
37e3186
Workaround issue #6346 to identify test failures that might lie behin…
Nov 14, 2019
824901b
Attempt to resolve a number of test failures
Nov 15, 2019
fad6705
Attempt to resolve a number of test failures
Nov 15, 2019
c9abc71
Merge branch 'feature/cosmos/v4' of https://github.com/Azure/azure-sd…
Nov 15, 2019
34cb24c
Attempt to resolve a number of test failures
Nov 16, 2019
d1d0183
Ignore ConsistencyTests1::validateConsistentPrefixOnAsyncReplication …
Nov 16, 2019
c3872d0
Attempt to workaround VeryLargeDocumentQueryTest::queryLargeDocuments…
Nov 16, 2019
925381a
Attempt to workaround VeryLargeDocumentQueryTest::queryLargeDocuments…
Nov 16, 2019
5ae65a8
Updated test TODOs based on current set of issues
Nov 17, 2019
32c42bf
Added TODOs and bumped SETUP_TIMEOUT for those test classes that rout…
Nov 17, 2019
3172b7b
Enabled LogLevelTest::createDocumentWithTraceLevelAtRoot
Nov 17, 2019
c5bd5a1
Added TODO and bumped timeout interval for before_OrderbyDocumentQuer…
Nov 18, 2019
b087b96
Added TODO, bumped timeout interval, and revised error handling strat…
Nov 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public Object[][] collectionLinkTypeArgProvider() {
};
}

// FIXME: Test is flaky, fails inconsistently
@Test(dataProvider = "collectionLinkTypeArgProvider", groups = "e2e")
public void readMyWrites(boolean useNameLink) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.cosmos.implementation.HttpConstants;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -41,8 +40,7 @@
* transform a flux to CompletableFuture. Please see
* {@link #transformObservableToCompletableFuture()}
*/
// FIXME: setup method times out inconsistently
@Ignore

public class ConflictAPITest extends DocumentClientTest {
private final static int TIMEOUT = 60000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@
* transform a flux to CompletableFuture. Please see
* {@link #transformObservableToCompletableFuture()}
*/
//FIXME: beforeClass times out inconsistently
@Ignore

public class DocumentCRUDAsyncAPITest extends DocumentClientTest {

private final static String PARTITION_KEY_PATH = "/mypk";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.cosmos.SqlQuerySpec;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
Expand All @@ -28,8 +27,6 @@
import java.util.List;
import java.util.UUID;

//FIXME setup method times out occasionally when running against emulator.
@Ignore
public class InMemoryGroupbyTest extends DocumentClientTest {

private final static int TIMEOUT = 60000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.hamcrest.Matchers;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;

Expand All @@ -38,8 +37,6 @@ public class UniqueIndexAsyncAPITest extends DocumentClientTest {
private AsyncDocumentClient client;
private Database createdDatabase;

//FIXME: Times out when running in emulator tests.
@Ignore
@Test(groups = "samples", timeOut = TIMEOUT)
public void uniqueIndex() {
DocumentCollection collectionDefinition = new DocumentCollection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ public enum ConflictResolutionMode {
/**
* Last writer wins conflict resolution mode
*
* Setting the ConflictResolutionMode to "LAST_WRITER_WINS" indicates that conflict resolution should be done by inspecting a field in the conflicting documents
* and picking the document which has the higher value in that path. See {@link ConflictResolutionPolicy#conflictResolutionPath()} for details on how to specify the path
* Setting the ConflictResolutionMode to "LAST_WRITER_WINS" indicates that conflict resolution should be done by
* inspecting a field in the conflicting documents and picking the document which has the higher value in that
* path. See {@link ConflictResolutionPolicy#getConflictResolutionPath} for details on how to specify the path
* to be checked for conflict resolution. Also note that Deletes win.
*/
LAST_WRITER_WINS,
Expand All @@ -25,7 +26,7 @@ public enum ConflictResolutionMode {
* The user could elect to register a user specified {@link StoredProcedure} for handling conflicting resources.
* Should the user not register a user specified StoredProcedure, conflicts will default to being made available as {@link Conflict} resources,
* which the user can inspect and manually resolve.
* See {@link ConflictResolutionPolicy#conflictResolutionProcedure()} for details on how to specify the stored procedure
* See {@link ConflictResolutionPolicy#getConflictResolutionProcedure()} for details on how to specify the stored procedure
* to run for conflict resolution.
*/
CUSTOM,
Expand All @@ -34,10 +35,10 @@ public enum ConflictResolutionMode {
* INVALID or unknown mode.
*/
INVALID;

@Override
public String toString() {
return StringUtils.remove(WordUtils.capitalizeFully(this.name(), '_'), '_');
return StringUtils.remove(WordUtils.capitalizeFully(this.name(), '_'), '_');
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public class ConflictResolutionPolicy extends JsonSerializable {
* Creates a LAST_WRITER_WINS {@link ConflictResolutionPolicy} with "/_ts" as the resolution path.
*
* In case of a conflict occurring on a document, the document with the higher integer value in the default path
* {@link Resource#timestamp()}, i.e., "/_ts" will be used.
* {@link Resource#getTimestamp()}, i.e., "/_ts" will be used.
* {@link Resource#getTimestamp}, i.e., "/_ts" will be used.
*
* @return ConflictResolutionPolicy.
*/
Expand Down Expand Up @@ -104,7 +103,7 @@ public static ConflictResolutionPolicy createLastWriterWinsPolicy(String conflic
* <ul>
* <li>In case the stored procedure fails or throws an exception,
* the conflict resolution will default to registering conflicts in the conflicts feed</li>
* <li>The user can provide the stored procedure @see {@link Resource#id()}</li>
* <li>The user can provide the stored procedure @see {@link Resource#getId()}</li>
* </ul>
* @param conflictResolutionSprocName stored procedure to perform conflict resolution.
* @return ConflictResolutionPolicy.
Expand Down Expand Up @@ -178,7 +177,7 @@ ConflictResolutionPolicy setMode(ConflictResolutionMode mode) {
* Gets the path which is present in each document in the Azure Cosmos DB service for last writer wins conflict-resolution.
* This path must be present in each document and must be an integer value.
* In case of a conflict occurring on a document, the document with the higher integer value in the specified path will be picked.
* If the path is unspecified, by default the {@link Resource#timestamp()} path will be used.
* If the path is unspecified, by default the {@link Resource#getTimestamp()} path will be used.
*
* This value should only be set when using {@link ConflictResolutionMode#LAST_WRITER_WINS}
*
Expand All @@ -193,7 +192,7 @@ public String getConflictResolutionPath() {
* Sets the path which is present in each document in the Azure Cosmos DB service for last writer wins conflict-resolution.
* This path must be present in each document and must be an integer value.
* In case of a conflict occurring on a document, the document with the higher integer value in the specified path will be picked.
* If the path is unspecified, by default the {@link Resource#timestamp()} path will be used.
* If the path is unspecified, by default the {@link Resource#getTimestamp()} path will be used.
*
* This value should only be set when using {@link ConflictResolutionMode#LAST_WRITER_WINS}
*
Expand All @@ -213,7 +212,7 @@ ConflictResolutionPolicy setConflictResolutionPath(String value) {
* <li>This value should only be set when using {@link ConflictResolutionMode#CUSTOM}</li>
* <li>In case the stored procedure fails or throws an exception,
* the conflict resolution will default to registering conflicts in the conflicts feed</li>
* <li>The user can provide the stored procedure @see {@link Resource#id()}</li>
* <li>The user can provide the stored procedure @see {@link Resource#getId}</li>
* </ul>
**
* @return the stored procedure to perform conflict resolution.]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* This is meant to be internally used only by our sdk.
**/
public interface ICollectionRoutingMapCache {
default Mono<CollectionRoutingMap> tryLookupAsync(
default Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(
String collectionRid,
CollectionRoutingMap previousValue,
Map<String, Object> properties) {
return tryLookupAsync(collectionRid, previousValue, false, properties);
}

Mono<CollectionRoutingMap> tryLookupAsync(
Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(
String collectionRid,
CollectionRoutingMap previousValue,
boolean forceRefreshCollectionRoutingMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public interface IRoutingMapProvider {
/// <param name="range">This method will return all ranges which overlap this range.</param>
/// <param name="forceRefresh">Whether forcefully refreshing the routing map is necessary</param>
/// <returns>List of effective partition key ranges for a collection or null if collection doesn't exist.</returns>
Mono<List<PartitionKeyRange>> tryGetOverlappingRangesAsync(String collectionResourceId, Range<String> range,
boolean forceRefresh /* = false */, Map<String, Object> properties);
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRangesAsync(String collectionResourceId, Range<String> range,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes already are already merged:
#6312

Are there more changes? or the branch is stale? could you merge the source branch back to your branch

boolean forceRefresh /* = false */, Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId,
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId,
boolean forceRefresh /* = false */, Map<String, Object> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ public PartitionKeyRangeGoneRetryPolicy(
this.feedOptions = feedOptions;
}

/// <summary>
/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="exception">Exception that occured when the operation was tried</param>
/// <param name="cancellationToken"></param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
CosmosClientException clientException = Utils.as(exception, CosmosClientException.class);
if (clientException != null &&
if (clientException != null &&
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.GONE) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) {

Expand All @@ -64,28 +64,30 @@ public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
if (this.feedOptions != null) {
request.properties = this.feedOptions.properties();
}
Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

return collectionObs.flatMap(collection -> {
return collectionObs.flatMap(collectionValueHolder -> {

Mono<CollectionRoutingMap> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collection.getResourceId(), null, request.properties);
Mono<Utils.ValueHolder<CollectionRoutingMap>> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collectionValueHolder.v.getResourceId(),
null, request.properties);

Mono<CollectionRoutingMap> refreshedRoutingMapObs = routingMapObs.flatMap(routingMap -> {
// Force refresh.
return this.partitionKeyRangeCache.tryLookupAsync(
collection.getResourceId(),
routingMap,
Mono<Utils.ValueHolder<CollectionRoutingMap>> refreshedRoutingMapObs = routingMapObs.flatMap(routingMapValueHolder -> {
if (routingMapValueHolder.v != null) {
// Force refresh.
return this.partitionKeyRangeCache.tryLookupAsync(
collectionValueHolder.v.getResourceId(),
routingMapValueHolder.v,
request.properties);
}).switchIfEmpty(Mono.defer(Mono::empty));
} else {
return Mono.just(new Utils.ValueHolder<>(null));
}
});

// TODO: Check if this behavior can be replaced by doOnSubscribe
return refreshedRoutingMapObs.flatMap(rm -> {
this.retried = true;
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}).switchIfEmpty(Mono.defer(() -> {
this.retried = true;
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}));
});

});

Expand All @@ -96,7 +98,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception exception) {

@Override
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
this.nextRetryPolicy.onBeforeSendRequest(request);
this.nextRetryPolicy.onBeforeSendRequest(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
request.forceNameCacheRefresh = true;
request.requestContext.resolvedCollectionRid = null;

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

return collectionObs.flatMap(collectionInfo -> {
if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionInfo.getResourceId())) {
return collectionObs.flatMap(collectionValueHolder -> {
if (collectionValueHolder.v == null) {
logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
} else if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionValueHolder.v.getResourceId())) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}
return Mono.just(shouldRetryResult);
}).switchIfEmpty(Mono.defer(() -> {
logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
return Mono.just(shouldRetryResult);
})).onErrorResume(throwable -> {
}).onErrorResume(throwable -> {
// When resolveCollectionAsync throws an exception ignore it because it's an attempt to recover an existing
// error. When the recovery fails we return ShouldRetryResult.noRetry and propagate the original exception to the client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,19 +869,19 @@ private Map<String, String> getRequestHeaders(RequestOptions options) {
private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request, Document document,
RequestOptions options) {

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
return collectionObs
.map(collection -> {
addPartitionKeyInformation(request, document, options, collection);
.map(collectionValueHolder -> {
addPartitionKeyInformation(request, document, options, collectionValueHolder.v);
return request;
});
}

private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options,
Mono<DocumentCollection> collectionObs) {
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs) {

return collectionObs.map(collection -> {
addPartitionKeyInformation(request, document, options, collection);
return collectionObs.map(collectionValueHolder -> {
addPartitionKeyInformation(request, document, options, collectionValueHolder.v);
return request;
});
}
Expand Down Expand Up @@ -969,7 +969,7 @@ private Mono<RxDocumentServiceRequest> getCreateDocumentRequest(String documentC
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(operationType, ResourceType.Document, path,
typedDocument, requestHeaders, options);

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
return addPartitionKeyInformation(request, typedDocument, options, collectionObs);
}

Expand Down Expand Up @@ -1217,7 +1217,7 @@ private Flux<ResourceResponse<Document>> replaceDocumentInternal(String document

validateResource(document);

Mono<DocumentCollection> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, document, options, collectionObs);

return requestObs.flux().flatMap(req -> {
Expand Down Expand Up @@ -1247,7 +1247,7 @@ private Flux<ResourceResponse<Document>> deleteDocumentInternal(String documentL
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete,
ResourceType.Document, path, requestHeaders, options);

Mono<DocumentCollection> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, options, collectionObs);

Expand Down Expand Up @@ -1283,7 +1283,7 @@ private Flux<ResourceResponse<Document>> readDocumentInternal(String documentLin
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read,
ResourceType.Document, path, requestHeaders, options);

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, options, collectionObs);

Expand Down Expand Up @@ -2587,7 +2587,7 @@ private <T extends Resource> Flux<FeedResponse<T>> readFeedCollectionChild(FeedO

Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeFunc = request -> {
return ObservableHelper.inlineIfPossibleAsObs(() -> {
Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<RxDocumentServiceRequest> requestObs = this.addPartitionKeyInformation(request, null, requestOptions, collectionObs);

return requestObs.flux().flatMap(req -> this.readFeed(req)
Expand Down
Loading