Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ sudo: false # faster builds
matrix:
include:
- os: linux
jdk: oraclejdk8
# - os: osx
# osx_image: xcode8
jdk: openjdk8
script:
- mvn dependency:resolve
- mvn test -P fast -DargLine="-DACCOUNT_HOST=$ACCOUNT_HOST -DACCOUNT_KEY=$ACCOUNT_KEY" cobertura:cobertura
- mvn package
after_success:
- bash <(curl -s https://codecov.io/bash)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

import java.util.concurrent.TimeUnit;

public class RetryAnalyzier extends RetryAnalyzerCount {
private final Logger logger = LoggerFactory.getLogger(RetryAnalyzier.class);
public class RetryAnalyzer extends RetryAnalyzerCount {
private final Logger logger = LoggerFactory.getLogger(RetryAnalyzer.class);
private final int waitBetweenRetriesInSeconds = 120;

public RetryAnalyzier() {
public RetryAnalyzer() {
this.setCount(Integer.parseInt(TestConfigurations.MAX_RETRY_LIMIT));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ Single<StoreResponse> writePrivateAsync(
try {
request.requestContext.clientSideRequestStatistics.recordResponse(request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
} catch (DocumentClientException e) {
} catch (Exception e) {
logger.error("Error occurred while recording response", e);
}

String value = ex.getResponseHeaders().get(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH);
if (!Strings.isNullOrWhiteSpace(value)) {
Integer result = Integers.tryParse(value);
Expand All @@ -208,7 +209,7 @@ Single<StoreResponse> writePrivateAsync(
try {
request.requestContext.clientSideRequestStatistics.recordResponse(request,
storeReader.createStoreResult(response, null, false, false, primaryURI.get()));
} catch (DocumentClientException e) {
} catch (Exception e) {
logger.error("Error occurred while recording response", e);
}
return barrierForGlobalStrong(request, response);
Expand Down Expand Up @@ -321,51 +322,41 @@ private Single<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrie
ReadMode.Strong,
false /*checkMinLsn*/,
false /*forceReadAll*/);
return storeResultListObs.flatMap(
return storeResultListObs.toObservable().flatMap(
responses -> {
if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
return Single.just(Boolean.TRUE);
return Observable.just(Boolean.TRUE);
}

//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
long maxGlobalCommittedLsn = (responses != null || !responses.isEmpty()) ?
(Long) responses.stream().map(s -> s.globalCommittedLSN).max(ComparatorUtils.NATURAL_COMPARATOR).get() :
0l;
maxGlobalCommittedLsnReceived.set(maxGlobalCommittedLsnReceived.get() > maxGlobalCommittedLsn ?
maxGlobalCommittedLsnReceived.get() : maxGlobalCommittedLsn);
maxGlobalCommittedLsnReceived.get() : maxGlobalCommittedLsn);

//only refresh on first barrier call, set to false for subsequent attempts.
barrierRequest.requestContext.forceRefreshAddressCache = false;

//trace on last retry.
//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
if (writeBarrierRetryCount.getAndDecrement() == 0) {
logger.debug("ConsistencyWriter: WaitForWriteBarrierAsync - Last barrier multi-region strong. Responses: {}",
String.join("; ", responses.stream().map(r -> r.toString()).collect(Collectors.toList())));
String.join("; ", responses.stream().map(r -> r.toString()).collect(Collectors.toList())));
logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", maxGlobalCommittedLsnReceived);
return Observable.just(false);
}

return Single.just(null);
}).toObservable();
}).repeatWhen(s -> {
if (writeBarrierRetryCount.get() == 0) {
return Observable.empty();
} else {

return Observable.empty();
});
}).repeatWhen(s -> s.flatMap(x -> {
// repeat with a delay
if ((ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES - writeBarrierRetryCount.get()) > ConsistencyWriter.MAX_SHORT_BARRIER_RETRIES_FOR_MULTI_REGION) {
return Observable.timer(ConsistencyWriter.DELAY_BETWEEN_WRITE_BARRIER_CALLS_IN_MS, TimeUnit.MILLISECONDS);
} else {
return Observable.timer(ConsistencyWriter.SHORT_BARRIER_RETRY_INTERVAL_IN_MS_FOR_MULTI_REGION, TimeUnit.MILLISECONDS);
}
}
}).take(1)
.map(r -> {
if (r == null) {
// after retries exhausted print this log and return false
logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", maxGlobalCommittedLsnReceived);

return false;
}
return r;
}).toSingle();
})
).take(1).toSingle();
}

static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolder<Long> lsn, Utils.ValueHolder<Long> globalCommittedLsn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ private Observable<List<StoreResult>> readFromReplicas(List<StoreResult> resultC
for (StoreResult srr : newStoreResults) {

entity.requestContext.requestChargeTracker.addCharge(srr.requestCharge);
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, srr);
try {
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, srr);
} catch (Exception e) {
logger.error("unexpected failure failure", e);
}
if (srr.isValid) {

try {
Expand Down Expand Up @@ -579,7 +583,11 @@ private Single<ReadReplicaResult> readPrimaryInternalAsync(
});

return storeResultObs.map(storeResult -> {
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, storeResult);
try {
entity.requestContext.clientSideRequestStatistics.recordResponse(entity, storeResult);
} catch (Exception e) {
logger.error("unexpected failure failure", e);
}
entity.requestContext.requestChargeTracker.addCharge(storeResult.requestCharge);

if (storeResult.isGoneException && !storeResult.isInvalidPartitionException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import rx.Completable;
import rx.Observable;
import rx.observable.ListenableFutureObservable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -93,10 +95,13 @@ public void setUp() {

int numberOfDocuments = 20;
// Add documents
List<Completable> tasks = new ArrayList<>();
for (int i = 0; i < numberOfDocuments; i++) {
Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
client.createDocument(getCollectionLink(), doc, null, true).toBlocking().single();
tasks.add(client.createDocument(getCollectionLink(), doc, null, true).toCompletable());
}

Completable.merge(tasks).await();
}

@AfterClass(groups = "samples", timeOut = TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import rx.Completable;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
Expand Down Expand Up @@ -119,11 +120,13 @@ public void setUp() {
.toBlocking().single().getResource();

numberOfDocuments = 20;
// Add documents
List<Completable> tasks = new ArrayList<>();
for (int i = 0; i < numberOfDocuments; i++) {
Document doc = new Document(String.format("{ 'id': 'loc%d', 'counter': %d}", i, i));
client.createDocument(getCollectionLink(), doc, null, true).toBlocking().single();
tasks.add(client.createDocument(getCollectionLink(), doc, null, true).toCompletable());
}

Completable.merge(tasks).await();
}

@AfterClass(groups = "samples", timeOut = TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import rx.observables.GroupedObservable;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -79,6 +80,8 @@ public void setUp() throws Exception {
int numberOfPayers = 10;
int numberOfDocumentsPerPayer = 10;

ArrayList<rx.Observable> list = new ArrayList<>();

for (int i = 0; i < numberOfPayers; i++) {

for (int j = 0; j < numberOfDocumentsPerPayer; j++) {
Expand All @@ -91,11 +94,11 @@ public void setUp() throws Exception {
+ "'payer_id': %d, "
+ " 'created_time' : %d "
+ "}", UUID.randomUUID().toString(), i, currentTime.getSecond()));
client.createDocument(getCollectionLink(), doc, null, true).toBlocking().single();

Thread.sleep(100);
list.add(client.createDocument(getCollectionLink(), doc, null, true));
}
}

Observable.merge(list.toArray(new Observable[0]), 1).toCompletable().await();
System.out.println("finished inserting documents");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,11 @@ public Single<List<Address>> answer(InvocationOnMock invocationOnMock) throws Th
.describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway")
.asList().hasSize(1);
httpClientWrapper.capturedRequest.clear();
assertThat(suboptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1);

// relaxes one replica being down
assertThat(suboptimalAddresses.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1));
assertThat(suboptimalAddresses.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2);

assertThat(fetchCounter.get()).isEqualTo(1);

// no refresh, use cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.microsoft.azure.cosmosdb.DatabaseForTest;
import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.RetryAnalyzier;
import com.microsoft.azure.cosmosdb.RetryAnalyzer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -259,7 +259,7 @@ public void replaceCollection(String collectionName, boolean isNameBased) {
safeDeleteAllCollections(client, database);
}

@Test(groups = { "emulator" }, timeOut = 10 * TIMEOUT, retryAnalyzer = RetryAnalyzier.class)
@Test(groups = { "emulator" }, timeOut = 10 * TIMEOUT, retryAnalyzer = RetryAnalyzer.class)
public void sessionTokenConsistencyCollectionDeleteCreateSameName() {
AsyncDocumentClient client1 = clientBuilder().build();
AsyncDocumentClient client2 = clientBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import com.microsoft.azure.cosmosdb.Database;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
Expand All @@ -52,7 +54,6 @@ public DocumentClientResourceLeakTest(Builder clientBuilder) {

@Test(enabled = false, groups = {"emulator"}, timeOut = TIMEOUT)
public void resourceLeak() throws Exception {

System.gc();
TimeUnit.SECONDS.sleep(10);
long usedMemoryInBytesBefore = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
Expand All @@ -79,7 +80,7 @@ public void resourceLeak() throws Exception {
usedMemoryInBytesBefore / (double)ONE_MB,
(usedMemoryInBytesAfter - usedMemoryInBytesBefore) / (double)ONE_MB);

assertThat(usedMemoryInBytesAfter - usedMemoryInBytesBefore).isLessThan(275 * ONE_MB);
assertThat(usedMemoryInBytesAfter - usedMemoryInBytesBefore).isLessThan(300 * ONE_MB);
}

@BeforeClass(enabled = false, groups = {"emulator"}, timeOut = SETUP_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ public void upsertDocument_ReplaceDocument(String documentId, boolean isNameBase
}

@BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
public void beforeClass() {
public void beforeClass() throws Exception {
createdDatabase = SHARED_DATABASE;
createdCollection = SHARED_MULTI_PARTITION_COLLECTION;
TimeUnit.SECONDS.sleep(1);

}

@AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
Expand All @@ -408,7 +410,7 @@ public void afterClass() {
}

@BeforeMethod(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
public void beforeMethod() {
public void beforeMethod() throws Exception {
safeClose(client);
client = this.clientBuilder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import com.microsoft.azure.cosmosdb.RetryAnalyzier;
import com.microsoft.azure.cosmosdb.RetryAnalyzer;
import org.apache.commons.lang3.StringUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -60,7 +60,6 @@
import com.microsoft.azure.cosmosdb.rx.internal.query.CompositeContinuationToken;
import com.microsoft.azure.cosmosdb.rx.internal.query.OrderByContinuationToken;

import org.testng.util.RetryAnalyzerCount;
import rx.Observable;
import rx.observers.TestSubscriber;

Expand Down Expand Up @@ -374,7 +373,7 @@ public void orderByContinuationTokenRoundTrip() throws Exception {
}
}
@Test(groups = { "simple" }, timeOut = TIMEOUT * 10, dataProvider = "sortOrder",
retryAnalyzer = RetryAnalyzier.class)
retryAnalyzer = RetryAnalyzer.class)
public void queryDocumentsWithOrderByContinuationTokensInteger(String sortOrder) throws Exception {
// Get Actual
String query = String.format("SELECT * FROM c ORDER BY c.propInt %s", sortOrder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ private static <T> ImmutableList<T> immutableListOrNull(List<T> list) {
}

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
objectMapper.configure(JsonParser.Feature.ALLOW_TRAILING_COMMA, true);
objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
objectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true);

accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY);
desiredConsistencies = immutableListOrNull(
ObjectUtils.defaultIfNull(parseDesiredConsistencies(TestConfigurations.DESIRED_CONSISTENCIES),
Expand All @@ -123,14 +129,7 @@ protected TestSuiteBase() {
}

protected TestSuiteBase(AsyncDocumentClient.Builder clientBuilder) {

super(clientBuilder);

objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
objectMapper.configure(JsonParser.Feature.ALLOW_TRAILING_COMMA, true);
objectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true);

logger.debug("Initializing {} ...", this.getClass().getSimpleName());
}

Expand Down Expand Up @@ -163,7 +162,7 @@ public Observable<ResourceResponse<Database>> deleteDatabase(String id) {
}

@BeforeSuite(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}, timeOut = SUITE_SETUP_TIMEOUT)
public static void beforeSuite() {
public static void beforeSuite() throws Exception {
logger.info("beforeSuite Started");
AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build();
try {
Expand All @@ -178,6 +177,8 @@ public static void beforeSuite() {
} finally {
houseKeepingClient.close();
}

TimeUnit.SECONDS.sleep(10);
}

@AfterSuite(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}, timeOut = SUITE_SHUTDOWN_TIMEOUT)
Expand Down
Loading