diff --git a/sdk/cosmos/README.md b/sdk/cosmos/README.md
index eb92d12b5ca8..4727992f92e7 100644
--- a/sdk/cosmos/README.md
+++ b/sdk/cosmos/README.md
@@ -92,7 +92,7 @@ For example, using maven, you can add the following dependency to your maven pom
com.microsoft.azure
azure-cosmos
- 3.3.1
+ 3.3.2
```
[//]: # ({x-version-update-end})
diff --git a/sdk/cosmos/changelog/README.md b/sdk/cosmos/changelog/README.md
index 3c00e3cf3287..c45a524834bf 100644
--- a/sdk/cosmos/changelog/README.md
+++ b/sdk/cosmos/changelog/README.md
@@ -1,5 +1,10 @@
## Changelog
+### 3.3.2
+- ChangeFeedProcessor; fixes and extra logging related to the creations of the lease documents.
+- Port consistency policy bug fix (see https://github.com/Azure/azure-cosmosdb-java/pull/196)
+- Port test fixes (see https://github.com/Azure/azure-cosmosdb-java/pull/196)
+
### 3.3.1
- Added @JsonIgnore on getLogger in JsonSerializable
diff --git a/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/ConflictAPITest.java b/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/ConflictAPITest.java
index 4901b8aa7f03..0cac91a23212 100644
--- a/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/ConflictAPITest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/ConflictAPITest.java
@@ -15,6 +15,7 @@
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.PartitionKeyDefinition;
import com.azure.data.cosmos.internal.HttpConstants;
+import com.azure.data.cosmos.internal.ResourceResponse;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Ignore;
@@ -81,10 +82,12 @@ public void setUp() {
int numberOfDocuments = 20;
// Add documents
+ List> tasks = new ArrayList<>();
for (int i = 0; i < numberOfDocuments; i++) {
Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
- client.createDocument(getCollectionLink(), doc, null, true).single().block();
+ tasks.add(client.createDocument(getCollectionLink(), doc, null, true).then());
}
+ Flux.merge(tasks).then().block();
}
@AfterClass(groups = "samples", timeOut = TIMEOUT)
diff --git a/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/DocumentQueryAsyncAPITest.java b/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/DocumentQueryAsyncAPITest.java
index 6f478e033a76..7a11b6419b93 100644
--- a/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/DocumentQueryAsyncAPITest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/DocumentQueryAsyncAPITest.java
@@ -106,10 +106,12 @@ public void setUp() {
numberOfDocuments = 20;
// Add documents
+ List> tasks = new ArrayList<>();
for (int i = 0; i < numberOfDocuments; i++) {
Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
- client.createDocument(getCollectionLink(), doc, null, true).single().block();
+ tasks.add(client.createDocument(getCollectionLink(), doc, null, true).then());
}
+ Flux.merge(tasks).then().block();
}
@AfterClass(groups = "samples", timeOut = TIMEOUT)
diff --git a/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/InMemoryGroupbyTest.java b/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/InMemoryGroupbyTest.java
index b93b2cd68f96..f51c5c321d01 100644
--- a/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/InMemoryGroupbyTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos-examples/src/test/java/com/azure/data/cosmos/rx/examples/InMemoryGroupbyTest.java
@@ -21,6 +21,7 @@
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
+import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -68,6 +69,7 @@ public void setUp() throws Exception {
int numberOfPayers = 10;
int numberOfDocumentsPerPayer = 10;
+ List> tasks = new ArrayList<>();
for (int i = 0; i < numberOfPayers; i++) {
@@ -81,11 +83,10 @@ public void setUp() throws Exception {
+ "'payer_id': %d, "
+ " 'created_time' : %d "
+ "}", UUID.randomUUID().toString(), i, currentTime.getSecond()));
- client.createDocument(getCollectionLink(), doc, null, true).single().block();
-
- Thread.sleep(100);
+ tasks.add(client.createDocument(getCollectionLink(), doc, null, true).then());
}
}
+ Flux.merge(tasks).then().block();
System.out.println("finished inserting documents");
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java
index ab4b5385d56a..0ce8aebec490 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java
@@ -61,6 +61,7 @@ public Mono initialize() {
if (initialized) {
return Mono.empty();
} else {
+ logger.info("Acquire initialization lock");
return this.leaseStore.acquireInitializationLock(this.lockTime)
.flatMap(lockAcquired -> {
this.isLockAcquired = lockAcquired;
@@ -74,7 +75,7 @@ public Mono initialize() {
}
})
.onErrorResume(throwable -> {
- logger.warn("Unexpected exception caught", throwable);
+ logger.warn("Unexpected exception caught while initializing the lock", throwable);
return Mono.just(this.isLockAcquired);
})
.flatMap(lockAcquired -> {
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseStore.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseStore.java
index 8e29ae6f4112..cd60e4b9af6d 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseStore.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseStore.java
@@ -14,6 +14,8 @@
import com.azure.data.cosmos.internal.changefeed.LeaseStore;
import com.azure.data.cosmos.internal.changefeed.RequestOptionsFactory;
import com.azure.data.cosmos.internal.changefeed.ServiceItemLease;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.time.Duration;
@@ -22,6 +24,7 @@
* Implementation for LeaseStore.
*/
class DocumentServiceLeaseStore implements LeaseStore {
+ private final Logger logger = LoggerFactory.getLogger(BootstrapperImpl.class);
private ChangeFeedContextClient client;
private String containerNamePrefix;
private CosmosContainer leaseCollectionLink;
@@ -51,16 +54,18 @@ public Mono isInitialized() {
CosmosItemRequestOptions requestOptions = this.requestOptionsFactory.createRequestOptions(
ServiceItemLease.fromDocument(doc));
- CosmosItem docItem = this.client.getContainerClient().getItem(markerDocId, "/id");
+ CosmosItem docItem = this.client.getContainerClient().getItem(markerDocId, markerDocId);
return this.client.readItem(docItem, requestOptions)
.flatMap(documentResourceResponse -> Mono.just(documentResourceResponse.item() != null))
.onErrorResume(throwable -> {
if (throwable instanceof CosmosClientException) {
CosmosClientException e = (CosmosClientException) throwable;
if (e.statusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_NOT_FOUND) {
+ logger.info("Lease synchronization document not found");
return Mono.just(false);
}
}
+ logger.error("Unexpected exception thrown", throwable);
return Mono.error(throwable);
});
}
@@ -71,15 +76,17 @@ public Mono markInitialized() {
CosmosItemProperties containerDocument = new CosmosItemProperties();
containerDocument.id(markerDocId);
- return this.client.createItem(this.leaseCollectionLink, containerDocument, null, false)
+ return this.client.createItem(this.leaseCollectionLink, containerDocument, new CosmosItemRequestOptions(markerDocId), false)
.map( item -> true)
.onErrorResume(throwable -> {
if (throwable instanceof CosmosClientException) {
CosmosClientException e = (CosmosClientException) throwable;
if (e.statusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_CONFLICT) {
+ logger.info("Lease synchronization document was created by a different instance");
return Mono.just(true);
}
}
+ logger.error("Unexpected exception thrown", throwable);
return Mono.just(false);
});
}
@@ -91,7 +98,7 @@ public Mono acquireInitializationLock(Duration lockExpirationTime) {
containerDocument.id(lockId);
BridgeInternal.setProperty(containerDocument, com.azure.data.cosmos.internal.Constants.Properties.TTL, Long.valueOf(lockExpirationTime.getSeconds()).intValue());
- return this.client.createItem(this.leaseCollectionLink, containerDocument, null, false)
+ return this.client.createItem(this.leaseCollectionLink, containerDocument, new CosmosItemRequestOptions(lockId), false)
.map(documentResourceResponse -> {
if (documentResourceResponse.item() != null) {
this.lockETag = documentResourceResponse.properties().etag();
@@ -104,9 +111,11 @@ public Mono acquireInitializationLock(Duration lockExpirationTime) {
if (throwable instanceof CosmosClientException) {
CosmosClientException e = (CosmosClientException) throwable;
if (e.statusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_CONFLICT) {
+ logger.info("Lease synchronization document was acquired by a different instance");
return Mono.just(false);
}
}
+ logger.error("Unexpected exception thrown", throwable);
return Mono.error(throwable);
});
}
@@ -129,7 +138,7 @@ public Mono releaseInitializationLock() {
accessCondition.condition(this.lockETag);
requestOptions.accessCondition(accessCondition);
- CosmosItem docItem = this.client.getContainerClient().getItem(lockId, "/id");
+ CosmosItem docItem = this.client.getContainerClient().getItem(lockId, lockId);
return this.client.deleteItem(docItem, requestOptions)
.map(documentResourceResponse -> {
if (documentResourceResponse.item() != null) {
@@ -143,10 +152,11 @@ public Mono releaseInitializationLock() {
if (throwable instanceof CosmosClientException) {
CosmosClientException e = (CosmosClientException) throwable;
if (e.statusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_CONFLICT) {
+ logger.info("Lease synchronization document was acquired by a different instance");
return Mono.just(false);
}
}
-
+ logger.error("Unexpected exception thrown", throwable);
return Mono.error(throwable);
});
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
index cf466b109d03..3ed4d9451f6e 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
@@ -63,7 +63,11 @@ public Mono run(CancellationToken cancellationToken) {
})
.then()
.onErrorResume(throwable -> {
- logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
+ if (throwable instanceof LeaseLostException) {
+ logger.info("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
+ } else {
+ logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
+ }
return Mono.error(throwable);
});
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java
index 593860803120..e5c4208e9aad 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java
@@ -453,6 +453,6 @@ private String getPartitionLeasePrefix() {
}
private CosmosItem createItemForLease(String leaseId) {
- return this.leaseDocumentClient.getContainerClient().getItem(leaseId, "/id");
+ return this.leaseDocumentClient.getContainerClient().getItem(leaseId, leaseId);
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java
index 3c51d58fd756..00d196bed6a7 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java
@@ -170,7 +170,7 @@ Mono writePrivateAsync(
try {
BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
- } catch (CosmosClientException e) {
+ } catch (Exception e) {
logger.error("Error occurred while recording response", e);
}
String value = ex.responseHeaders().get(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH);
@@ -191,7 +191,7 @@ Mono writePrivateAsync(
try {
BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request,
storeReader.createStoreResult(response, null, false, false, primaryURI.get()));
- } catch (CosmosClientException e) {
+ } catch (Exception e) {
logger.error("Error occurred while recording response", e);
}
return barrierForGlobalStrong(request, response);
@@ -309,43 +309,35 @@ private Mono waitForWriteBarrierAsync(RxDocumentServiceRequest barrierR
}
//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
- long maxGlobalCommittedLsn = (responses != null || !responses.isEmpty()) ?
- (Long) responses.stream().map(s -> s.globalCommittedLSN).max(ComparatorUtils.NATURAL_COMPARATOR).get() :
+ long maxGlobalCommittedLsn = (responses != null) ?
+ (Long) responses.stream().map(s -> s.globalCommittedLSN).max(ComparatorUtils.NATURAL_COMPARATOR).orElse(0L) :
0L;
+
maxGlobalCommittedLsnReceived.set(maxGlobalCommittedLsnReceived.get() > maxGlobalCommittedLsn ?
maxGlobalCommittedLsnReceived.get() : maxGlobalCommittedLsn);
//only refresh on first barrier call, set to false for subsequent attempts.
barrierRequest.requestContext.forceRefreshAddressCache = false;
- //trace on last retry.
+ //get max global committed lsn from current batch of responses, then update if greater than max of all batches.
if (writeBarrierRetryCount.getAndDecrement() == 0) {
logger.debug("ConsistencyWriter: WaitForWriteBarrierAsync - Last barrier multi-region strong. Responses: {}",
responses.stream().map(StoreResult::toString).collect(Collectors.joining("; ")));
+ logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", maxGlobalCommittedLsnReceived);
+ return Mono.just(Boolean.FALSE);
}
return Mono.empty();
}).flux();
- }).repeatWhen(s -> {
- if (writeBarrierRetryCount.get() == 0) {
- return Flux.empty();
+ }).repeatWhen(s -> s.flatMap(x -> {
+ // repeat with a delay
+ if ((ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES - writeBarrierRetryCount.get()) > ConsistencyWriter.MAX_SHORT_BARRIER_RETRIES_FOR_MULTI_REGION) {
+ return Mono.delay(Duration.ofMillis(ConsistencyWriter.DELAY_BETWEEN_WRITE_BARRIER_CALLS_IN_MS)).flux();
} else {
-
- if ((ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES - writeBarrierRetryCount.get()) > ConsistencyWriter.MAX_SHORT_BARRIER_RETRIES_FOR_MULTI_REGION) {
- return Flux.just(0L).delayElements(Duration.ofMillis(ConsistencyWriter.DELAY_BETWEEN_WRITE_BARRIER_CALLS_IN_MS));
- } else {
- return Flux.just(0L).delayElements(Duration.ofMillis(ConsistencyWriter.SHORT_BARRIER_RETRY_INTERVAL_IN_MS_FOR_MULTI_REGION));
- }
+ return Mono.delay(Duration.ofMillis(ConsistencyWriter.SHORT_BARRIER_RETRY_INTERVAL_IN_MS_FOR_MULTI_REGION)).flux();
}
- }).take(1)
- .switchIfEmpty(Mono.defer(() -> {
- // after retries exhausted print this log and return false
- logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", maxGlobalCommittedLsnReceived);
-
- return Mono.just(false);
- }))
- .map(r -> r)
- .single();
+ })
+ ).take(1).single();
}
static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolder lsn, Utils.ValueHolder globalCommittedLsn) {
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java
index 72e58edfa8d6..1d2dd33be114 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java
@@ -248,7 +248,11 @@ private Flux> readFromReplicas(List resultCollect
for (StoreResult srr : newStoreResults) {
entity.requestContext.requestChargeTracker.addCharge(srr.requestCharge);
- BridgeInternal.recordResponse(entity.requestContext.cosmosResponseDiagnostics, entity, srr);
+ try {
+ BridgeInternal.recordResponse(entity.requestContext.cosmosResponseDiagnostics, entity, srr);
+ } catch (Exception e) {
+ logger.error("Unexpected failure while recording response", e);
+ }
if (srr.isValid) {
try {
@@ -557,7 +561,11 @@ private Mono readPrimaryInternalAsync(
});
return storeResultObs.map(storeResult -> {
- BridgeInternal.recordResponse(entity.requestContext.cosmosResponseDiagnostics, entity, storeResult);
+ try {
+ BridgeInternal.recordResponse(entity.requestContext.cosmosResponseDiagnostics, entity, storeResult);
+ } catch (Exception e) {
+ logger.error("Unexpected failure while recording response", e);
+ }
entity.requestContext.requestChargeTracker.addCharge(storeResult.requestCharge);
if (storeResult.isGoneException && !storeResult.isInvalidPartitionException) {
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/SessionTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/SessionTest.java
index e3253f9f49c8..f74431eb3b71 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/SessionTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/SessionTest.java
@@ -111,21 +111,17 @@ public void sessionConsistency_ReadYourWrites(boolean isNameBased) {
Document documentCreated = spyClient.createDocument(getCollectionLink(isNameBased), new Document(), null, false)
.blockFirst().getResource();
- // We send session tokens on Writes in GATEWAY mode
- if (connectionMode == ConnectionMode.GATEWAY) {
- assertThat(getSessionTokensInRequests()).hasSize(3 * i + 1);
- assertThat(getSessionTokensInRequests().get(3 * i + 0)).isNotEmpty();
- }
+ spyClient.clearCapturedRequests();
spyClient.readDocument(getDocumentLink(documentCreated, isNameBased), options).blockFirst();
- assertThat(getSessionTokensInRequests()).hasSize(3 * i + 2);
- assertThat(getSessionTokensInRequests().get(3 * i + 1)).isNotEmpty();
+ assertThat(getSessionTokensInRequests()).hasSize(1);
+ assertThat(getSessionTokensInRequests().get(0)).isNotEmpty();
spyClient.readDocument(getDocumentLink(documentCreated, isNameBased), options).blockFirst();
- assertThat(getSessionTokensInRequests()).hasSize(3 * i + 3);
- assertThat(getSessionTokensInRequests().get(3 * i + 2)).isNotEmpty();
+ assertThat(getSessionTokensInRequests()).hasSize(2);
+ assertThat(getSessionTokensInRequests().get(1)).isNotEmpty();
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java
index 9f43c1c8f914..da3991318195 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java
@@ -385,7 +385,10 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro
.describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway")
.asList().hasSize(1);
httpClientWrapper.capturedRequests.clear();
- assertThat(suboptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1);
+
+ // relaxes one replica being down
+ assertThat(suboptimalAddresses.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1));
+ assertThat(suboptimalAddresses.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2);
assertThat(fetchCounter.get()).isEqualTo(1);
// no refresh, use cache
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java
index 062847e91c85..0a2d50d0e1f9 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java
@@ -61,7 +61,7 @@
public class DocumentProducerTest {
private final static Logger logger = LoggerFactory.getLogger(DocumentProducerTest.class);
- private static final long TIMEOUT = 10000;
+ private static final long TIMEOUT = 20000;
private final static String OrderByPayloadFieldName = "payload";
private final static String OrderByItemsFieldName = "orderByItems";
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/DocumentClientResourceLeakTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/DocumentClientResourceLeakTest.java
index fca51e0e65bb..a471b69ef4b6 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/DocumentClientResourceLeakTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/DocumentClientResourceLeakTest.java
@@ -61,7 +61,7 @@ public void resourceLeak() throws Exception {
usedMemoryInBytesBefore / (double)ONE_MB,
(usedMemoryInBytesAfter - usedMemoryInBytesBefore) / (double)ONE_MB);
- assertThat(usedMemoryInBytesAfter - usedMemoryInBytesBefore).isLessThan(125 * ONE_MB);
+ assertThat(usedMemoryInBytesAfter - usedMemoryInBytesBefore).isLessThan(300 * ONE_MB);
}
@BeforeClass(groups = {"emulator"}, timeOut = SETUP_TIMEOUT)