Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
748ca7f
draft fix for global strong retry
moderakh May 22, 2019
5340176
bumped timeout values
moderakh May 23, 2019
8750820
debugging
moderakh May 23, 2019
c0de539
cleanup
moderakh May 23, 2019
3fc20b3
added e2e test profile, relaxed replica counting test
moderakh May 28, 2019
0a4a4ee
read writes test
moderakh May 28, 2019
591fad2
tune test
moderakh May 29, 2019
5b6b596
Merge branch 'master' into users/moderakh/global-strong-draft-bugfix
moderakh Jun 3, 2019
acdaff5
retry analyzer renamed
moderakh Jun 4, 2019
deb6d36
Merge branch 'master' into users/moderakh/global-strong-draft-bugfix
moderakh Jun 4, 2019
0a21777
Merge branch 'users/moderakh/global-strong-draft-bugfix' of github.co…
moderakh Jun 4, 2019
d270c28
fixed test
moderakh Jun 10, 2019
2118c01
Merge branch 'master' into users/moderakh/global-strong-draft-bugfix
moderakh Jun 13, 2019
52d7e07
fixed test
moderakh Jun 14, 2019
d1ec5bb
Merge branch 'master' into users/moderakh/global-strong-draft-bugfix
moderakh Jun 17, 2019
4ab8d51
fix tests
moderakh Jun 18, 2019
3b53b0d
travis fix
moderakh Jun 18, 2019
66dc8b8
sync master
Jun 20, 2019
284c8ba
Removed skip exception for TCP as the test is disabled for all protoc…
kushagraThapar Jun 20, 2019
8bdfbd2
update
moderakh Jun 20, 2019
e05f1fa
try/catch for diagnostic string
moderakh Jun 20, 2019
359e552
Merge branch 'master' into users/moderakh/global-strong-draft-bugfix
moderakh Jun 20, 2019
fc24b4b
fixed compilation error
moderakh Jun 21, 2019
56bd5c3
fixed test
moderakh Jun 21, 2019
9f1f167
fixed test
moderakh Jun 21, 2019
967b5ed
Merge branch 'users/moderakh/global-strong-draft-bugfix' into users/c…
moderakh Jun 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ sudo: false # faster builds
matrix:
include:
- os: linux
jdk: oraclejdk8
# - os: osx
# osx_image: xcode8
jdk: openjdk8
script:
- mvn dependency:resolve
- mvn test -P fast -DargLine="-DACCOUNT_HOST=$ACCOUNT_HOST -DACCOUNT_KEY=$ACCOUNT_KEY" cobertura:cobertura
- mvn package
after_success:
- bash <(curl -s https://codecov.io/bash)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

import java.util.concurrent.TimeUnit;

public class RetryAnalyzier extends RetryAnalyzerCount {
private final Logger logger = LoggerFactory.getLogger(RetryAnalyzier.class);
public class RetryAnalyzer extends RetryAnalyzerCount {
private final Logger logger = LoggerFactory.getLogger(RetryAnalyzer.class);
private final int waitBetweenRetriesInSeconds = 120;

public RetryAnalyzier() {
public RetryAnalyzer() {
this.setCount(Integer.parseInt(TestConfigurations.MAX_RETRY_LIMIT));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ Single<StoreResponse> writePrivateAsync(
try {
request.requestContext.clientSideRequestStatistics.recordResponse(request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
} catch (DocumentClientException e) {
} catch (Exception e) {
logger.error("Error occurred while recording response", e);
}

String value = ex.getResponseHeaders().get(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH);
if (!Strings.isNullOrWhiteSpace(value)) {
Integer result = Integers.tryParse(value);
Expand All @@ -208,7 +209,7 @@ Single<StoreResponse> writePrivateAsync(
try {
request.requestContext.clientSideRequestStatistics.recordResponse(request,
storeReader.createStoreResult(response, null, false, false, primaryURI.get()));
} catch (DocumentClientException e) {
} catch (Exception e) {
logger.error("Error occurred while recording response", e);
}
return barrierForGlobalStrong(request, response);
Expand Down Expand Up @@ -321,51 +322,41 @@ private Single<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrie
ReadMode.Strong,
false /*checkMinLsn*/,
false /*forceReadAll*/);
return storeResultListObs.flatMap(
return storeResultListObs.toObservable().flatMap(
responses -> {
if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
return Single.just(Boolean.TRUE);
return Observable.just(Boolean.TRUE);
}

//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
long maxGlobalCommittedLsn = (responses != null || !responses.isEmpty()) ?
(Long) responses.stream().map(s -> s.globalCommittedLSN).max(ComparatorUtils.NATURAL_COMPARATOR).get() :
0l;
maxGlobalCommittedLsnReceived.set(maxGlobalCommittedLsnReceived.get() > maxGlobalCommittedLsn ?
maxGlobalCommittedLsnReceived.get() : maxGlobalCommittedLsn);
maxGlobalCommittedLsnReceived.get() : maxGlobalCommittedLsn);

//only refresh on first barrier call, set to false for subsequent attempts.
barrierRequest.requestContext.forceRefreshAddressCache = false;

//trace on last retry.
//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
if (writeBarrierRetryCount.getAndDecrement() == 0) {
logger.debug("ConsistencyWriter: WaitForWriteBarrierAsync - Last barrier multi-region strong. Responses: {}",
String.join("; ", responses.stream().map(r -> r.toString()).collect(Collectors.toList())));
String.join("; ", responses.stream().map(r -> r.toString()).collect(Collectors.toList())));
logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", maxGlobalCommittedLsnReceived);
return Observable.just(false);
}

return Single.just(null);
}).toObservable();
}).repeatWhen(s -> {
if (writeBarrierRetryCount.get() == 0) {
return Observable.empty();
} else {

return Observable.empty();
});
}).repeatWhen(s -> s.flatMap(x -> {
// repeat with a delay
if ((ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES - writeBarrierRetryCount.get()) > ConsistencyWriter.MAX_SHORT_BARRIER_RETRIES_FOR_MULTI_REGION) {
return Observable.timer(ConsistencyWriter.DELAY_BETWEEN_WRITE_BARRIER_CALLS_IN_MS, TimeUnit.MILLISECONDS);
} else {
return Observable.timer(ConsistencyWriter.SHORT_BARRIER_RETRY_INTERVAL_IN_MS_FOR_MULTI_REGION, TimeUnit.MILLISECONDS);
}
}
}).take(1)
.map(r -> {
if (r == null) {
// after retries exhausted print this log and return false
logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", maxGlobalCommittedLsnReceived);

return false;
}
return r;
}).toSingle();
})
).take(1).toSingle();
}

static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolder<Long> lsn, Utils.ValueHolder<Long> globalCommittedLsn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ private Observable<List<StoreResult>> readFromReplicas(List<StoreResult> resultC
for (StoreResult srr : newStoreResults) {

entity.requestContext.requestChargeTracker.addCharge(srr.requestCharge);
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, srr);
try {
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, srr);
} catch (Exception e) {
logger.error("unexpected failure failure", e);
}
if (srr.isValid) {

try {
Expand Down Expand Up @@ -579,7 +583,11 @@ private Single<ReadReplicaResult> readPrimaryInternalAsync(
});

return storeResultObs.map(storeResult -> {
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, storeResult);
try {
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, storeResult);
} catch (Exception e) {
logger.error("unexpected failure failure", e);
}
entity.requestContext.requestChargeTracker.addCharge(storeResult.requestCharge);

if (storeResult.isGoneException && !storeResult.isInvalidPartitionException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import rx.Completable;
import rx.Observable;
import rx.observable.ListenableFutureObservable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -93,10 +95,13 @@ public void setUp() {

int numberOfDocuments = 20;
// Add documents
List<Completable> tasks = new ArrayList<>();
for (int i = 0; i < numberOfDocuments; i++) {
Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
client.createDocument(getCollectionLink(), doc, null, true).toBlocking().single();
tasks.add(client.createDocument(getCollectionLink(), doc, null, true).toCompletable());
}

Completable.merge(tasks).await();
}

@AfterClass(groups = "samples", timeOut = TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import rx.Completable;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
Expand Down Expand Up @@ -119,11 +120,13 @@ public void setUp() {
.toBlocking().single().getResource();

numberOfDocuments = 20;
// Add documents
List<Completable> tasks = new ArrayList<>();
for (int i = 0; i < numberOfDocuments; i++) {
Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
client.createDocument(getCollectionLink(), doc, null, true).toBlocking().single();
tasks.add(client.createDocument(getCollectionLink(), doc, null, true).toCompletable());
}

Completable.merge(tasks).await();
}

@AfterClass(groups = "samples", timeOut = TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import rx.observables.GroupedObservable;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -79,6 +80,8 @@ public void setUp() throws Exception {
int numberOfPayers = 10;
int numberOfDocumentsPerPayer = 10;

ArrayList<rx.Observable> list = new ArrayList<>();

for (int i = 0; i < numberOfPayers; i++) {

for (int j = 0; j < numberOfDocumentsPerPayer; j++) {
Expand All @@ -91,11 +94,11 @@ public void setUp() throws Exception {
+ "'payer_id': %d, "
+ " 'created_time' : %d "
+ "}", UUID.randomUUID().toString(), i, currentTime.getSecond()));
client.createDocument(getCollectionLink(), doc, null, true).toBlocking().single();

Thread.sleep(100);
list.add(client.createDocument(getCollectionLink(), doc, null, true));
}
}

Observable.merge(list.toArray(new Observable[0]), 1).toCompletable().await();
System.out.println("finished inserting documents");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,11 @@ public Single<List<Address>> answer(InvocationOnMock invocationOnMock) throws Th
.describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway")
.asList().hasSize(1);
httpClientWrapper.capturedRequest.clear();
assertThat(suboptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1);

// relaxes one replica being down
assertThat(suboptimalAddresses.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1));
assertThat(suboptimalAddresses.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2);

assertThat(fetchCounter.get()).isEqualTo(1);

// no refresh, use cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.microsoft.azure.cosmosdb.DatabaseForTest;
import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.RetryAnalyzier;
import com.microsoft.azure.cosmosdb.RetryAnalyzer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -259,7 +259,7 @@ public void replaceCollection(String collectionName, boolean isNameBased) {
safeDeleteAllCollections(client, database);
}

@Test(groups = { "emulator" }, timeOut = 10 * TIMEOUT, retryAnalyzer = RetryAnalyzier.class)
@Test(groups = { "emulator" }, timeOut = 10 * TIMEOUT, retryAnalyzer = RetryAnalyzer.class)
public void sessionTokenConsistencyCollectionDeleteCreateSameName() {
AsyncDocumentClient client1 = clientBuilder().build();
AsyncDocumentClient client2 = clientBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import com.microsoft.azure.cosmosdb.Database;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
Expand All @@ -52,7 +54,6 @@ public DocumentClientResourceLeakTest(Builder clientBuilder) {

@Test(enabled = false, groups = {"emulator"}, timeOut = TIMEOUT)
public void resourceLeak() throws Exception {

System.gc();
TimeUnit.SECONDS.sleep(10);
long usedMemoryInBytesBefore = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
Expand All @@ -79,7 +80,7 @@ public void resourceLeak() throws Exception {
usedMemoryInBytesBefore / (double)ONE_MB,
(usedMemoryInBytesAfter - usedMemoryInBytesBefore) / (double)ONE_MB);

assertThat(usedMemoryInBytesAfter - usedMemoryInBytesBefore).isLessThan(275 * ONE_MB);
assertThat(usedMemoryInBytesAfter - usedMemoryInBytesBefore).isLessThan(300 * ONE_MB);
}

@BeforeClass(enabled = false, groups = {"emulator"}, timeOut = SETUP_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ public void upsertDocument_ReplaceDocument(String documentId, boolean isNameBase
}

@BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
public void beforeClass() {
public void beforeClass() throws Exception {
createdDatabase = SHARED_DATABASE;
createdCollection = SHARED_MULTI_PARTITION_COLLECTION;
TimeUnit.SECONDS.sleep(1);

}

@AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
Expand All @@ -408,7 +410,7 @@ public void afterClass() {
}

@BeforeMethod(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
public void beforeMethod() {
public void beforeMethod() throws Exception {
safeClose(client);
client = this.clientBuilder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import com.microsoft.azure.cosmosdb.RetryAnalyzier;
import com.microsoft.azure.cosmosdb.RetryAnalyzer;
import org.apache.commons.lang3.StringUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -60,7 +60,6 @@
import com.microsoft.azure.cosmosdb.rx.internal.query.CompositeContinuationToken;
import com.microsoft.azure.cosmosdb.rx.internal.query.OrderByContinuationToken;

import org.testng.util.RetryAnalyzerCount;
import rx.Observable;
import rx.observers.TestSubscriber;

Expand Down Expand Up @@ -374,7 +373,7 @@ public void orderByContinuationTokenRoundTrip() throws Exception {
}
}
@Test(groups = { "simple" }, timeOut = TIMEOUT * 10, dataProvider = "sortOrder",
retryAnalyzer = RetryAnalyzier.class)
retryAnalyzer = RetryAnalyzer.class)
public void queryDocumentsWithOrderByContinuationTokensInteger(String sortOrder) throws Exception {
// Get Actual
String query = String.format("SELECT * FROM c ORDER BY c.propInt %s", sortOrder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ private static <T> ImmutableList<T> immutableListOrNull(List<T> list) {
}

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
objectMapper.configure(JsonParser.Feature.ALLOW_TRAILING_COMMA, true);
objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
objectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true);

accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY);
desiredConsistencies = immutableListOrNull(
ObjectUtils.defaultIfNull(parseDesiredConsistencies(TestConfigurations.DESIRED_CONSISTENCIES),
Expand All @@ -123,14 +129,7 @@ protected TestSuiteBase() {
}

protected TestSuiteBase(AsyncDocumentClient.Builder clientBuilder) {

super(clientBuilder);

objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
objectMapper.configure(JsonParser.Feature.ALLOW_TRAILING_COMMA, true);
objectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true);

logger.debug("Initializing {} ...", this.getClass().getSimpleName());
}

Expand Down Expand Up @@ -163,7 +162,7 @@ public Observable<ResourceResponse<Database>> deleteDatabase(String id) {
}

@BeforeSuite(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}, timeOut = SUITE_SETUP_TIMEOUT)
public static void beforeSuite() {
public static void beforeSuite() throws Exception {
logger.info("beforeSuite Started");
AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build();
try {
Expand All @@ -178,6 +177,8 @@ public static void beforeSuite() {
} finally {
houseKeepingClient.close();
}

TimeUnit.SECONDS.sleep(10);
}

@AfterSuite(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}, timeOut = SUITE_SHUTDOWN_TIMEOUT)
Expand Down
Loading