Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b0ca917
Reactor Netty Milestone release changes
kushagraThapar Aug 30, 2019
5b4d442
Updated Milestone version to RC version
kushagraThapar Sep 4, 2019
93d9a91
Code review comments:
kushagraThapar Sep 5, 2019
c474f75
Updated reactor release and reactor netty release
kushagraThapar Sep 30, 2019
d6087c4
Setting default connections to 1000, and fixed Exception handling by …
kushagraThapar Oct 1, 2019
13fb2a0
Merge branch 'master' into reactor_netty_milestone_release
kushagraThapar Oct 1, 2019
fc123c3
Unwrapping exception
kushagraThapar Oct 1, 2019
4d38f26
Merge branch 'master' into reactor_netty_milestone_release
kushagraThapar Oct 1, 2019
3d672fb
Fixed validate failure by unwrapping exception
kushagraThapar Oct 1, 2019
098365b
Merge branch 'master' into reactor_netty_milestone_release
kushagraThapar Oct 2, 2019
d2e2569
Fixed multi-master conflict resolution test
kushagraThapar Oct 2, 2019
a201fea
Unwrapping exceptions wherever possible to make sure we check on inne…
kushagraThapar Oct 2, 2019
f6dfd06
Fixed compilation errors
kushagraThapar Oct 3, 2019
35530b4
Merge branch 'master' into reactor_netty_milestone_release
kushagraThapar Oct 3, 2019
0a3e149
Merge branch 'master' into reactor_netty_milestone_release
kushagraThapar Oct 3, 2019
4e54cc3
Fixed doOnError in Store Client and Consistency Writer to handle unwr…
kushagraThapar Oct 3, 2019
9533896
Handling empty response from backend
kushagraThapar Oct 4, 2019
3ffcb75
Updating number of documents and collection throughput size
kushagraThapar Oct 12, 2019
fcbc679
Updated Feed Response Validator for query metrics
kushagraThapar Oct 12, 2019
e3c0920
Fixed query documents with aggregate test
kushagraThapar Oct 14, 2019
4f01fd2
Updated retrieved documents count
kushagraThapar Oct 14, 2019
7641a2d
Updated getLogicalPlanBuildTime for query metrics validation
kushagraThapar Oct 15, 2019
5973dfe
Unused imports
kushagraThapar Oct 15, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Configuration {
private int documentDataFieldSize = 20;

@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
private Integer maxConnectionPoolSize = 1000;
private Integer maxConnectionPoolSize = 500;
Comment thread
kushagraThapar marked this conversation as resolved.

@Parameter(names = "-consistencyLevel", description = "Consistency Level", converter = ConsistencyLevelConverter.class)
private ConsistencyLevel consistencyLevel = ConsistencyLevel.SESSION;
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/microsoft-azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Licensed under the MIT License.
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>${reactor-netty.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.data.cosmos.internal.Permission;
import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -167,14 +168,15 @@ public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id) {
}

private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosDatabase database){
return database.read().onErrorResume(exception -> {
if (exception instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) exception;
return database.read().onErrorResume(t -> {
Throwable throwable = Exceptions.unwrap(t);
if (throwable instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) throwable;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.id()), new CosmosDatabaseRequestOptions());
}
}
return Mono.error(exception);
return Mono.error(throwable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.data.cosmos.internal.Offer;
import com.azure.data.cosmos.internal.Paths;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -308,13 +309,14 @@ public Mono<CosmosContainerResponse> createContainerIfNotExists(String id, Strin
private Mono<CosmosContainerResponse> createContainerIfNotExistsInternal(
CosmosContainerProperties containerProperties, CosmosContainer container, CosmosContainerRequestOptions options) {
return container.read(options).onErrorResume(exception -> {
if (exception instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) exception;
Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) unwrappedException;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createContainer(containerProperties, options);
}
}
return Mono.error(exception);
return Mono.error(unwrappedException);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public class Configs {
private static final int CPU_CNT = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_DIRECT_HTTPS_POOL_SIZE = CPU_CNT * 500;

// Reactor Netty Constants
private static final int MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000;
private static final int CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS = 45 * 1000;
Comment thread
kushagraThapar marked this conversation as resolved.
private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 500;
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";

public Configs() {
Expand Down Expand Up @@ -147,6 +151,18 @@ public String getReactorNettyConnectionPoolName() {
return REACTOR_NETTY_CONNECTION_POOL_NAME;
}

public int getMaxIdleConnectionTimeoutInMillis() {
return MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS;
}

public int getConnectionAcquireTimeoutInMillis() {
return CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS;
}

public int getReactorNettyMaxConnectionPoolSize() {
return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE;
}

private static String getJVMConfigAsString(String propName, String defaultValue) {
String propValue = System.getProperty(propName);
return StringUtils.defaultString(propValue, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.internal.StringUtil;
Comment thread
kushagraThapar marked this conversation as resolved.
Outdated
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -43,7 +43,6 @@
*/
class RxGatewayStoreModel implements RxStoreModel {

private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024;
private final Logger logger = LoggerFactory.getLogger(RxGatewayStoreModel.class);
private final Map<String, String> defaultHeaders;
private final HttpClient httpClient;
Expand Down Expand Up @@ -229,22 +228,6 @@ private String ensureSlashPrefixed(String path) {
return "/" + path;
}

private Mono<String> toString(Flux<ByteBuf> contentObservable) {
return contentObservable
.reduce(
new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE),
(out, bb) -> {
try {
bb.readBytes(out, bb.readableBytes());
return out;
}
catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8));
}

/**
* Transforms the reactor netty's client response Observable to RxDocumentServiceResponse Observable.
*
Expand All @@ -259,102 +242,63 @@ private Mono<String> toString(Flux<ByteBuf> contentObservable) {
private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpResponse> httpResponseMono,
RxDocumentServiceRequest request) {

if (request.getIsMedia()) {
return httpResponseMono.flatMap(httpResponse -> {
return httpResponseMono.flatMap(httpResponse -> {

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();
// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();

Flux<InputStream> inputStreamObservable;

if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
inputStreamObservable = Flux.just(IOUtils.toInputStream("", StandardCharsets.UTF_8));
} else {
// transforms the ByteBufFlux to Flux<InputStream>
inputStreamObservable = httpResponse
.body()
.flatMap(byteBuf ->
Flux.just(IOUtils.toInputStream(byteBuf.toString(StandardCharsets.UTF_8), StandardCharsets.UTF_8)));
}

return inputStreamObservable
.flatMap(contentInputStream -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request,
HttpResponseStatus.valueOf(httpResponseStatus),
httpResponseHeaders,
null,
contentInputStream);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus, HttpUtils
.unescape(httpResponseHeaders.toMap().entrySet()), contentInputStream);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
}).single();

}).map(RxDocumentServiceResponse::new).flux();

} else {
return httpResponseMono.flatMap(httpResponse -> {
Flux<String> contentObservable;

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();

Flux<String> contentObservable;

if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
contentObservable = Flux.just(StringUtils.EMPTY);
} else {
// transforms the ByteBufFlux to Flux<String>
contentObservable = toString(httpResponse.body()).flux();
}
if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
contentObservable = Flux.just(StringUtils.EMPTY);
} else {
// transforms the ByteBufFlux to Flux<String>
contentObservable = httpResponse
.bodyAsString()
.switchIfEmpty(Mono.just(StringUtils.EMPTY))
.flux();
}

return contentObservable
.flatMap(content -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
}).single();

}).map(RxDocumentServiceResponse::new)
.onErrorResume(throwable -> {
if (!(throwable instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", throwable.getMessage(), throwable);
return Mono.error(throwable);
}

Exception exception = (Exception) throwable;
if (!(exception instanceof CosmosClientException)) {
// wrap in CosmosClientException
logger.error("Network failure", exception);
CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Mono.error(dce);
}

return Mono.error(exception);
}).flux();
}
return contentObservable
.flatMap(content -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
})
.single();

}).map(RxDocumentServiceResponse::new)
.onErrorResume(throwable -> {
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
if (!(unwrappedException instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", unwrappedException.getMessage(), unwrappedException);
return Mono.error(unwrappedException);
}

Exception exception = (Exception) unwrappedException;
if (!(exception instanceof CosmosClientException)) {
// wrap in CosmosClientException
logger.error("Network failure", exception);
CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Mono.error(dce);
}

return Mono.error(exception);
}).flux();
}

private void validateOrThrow(RxDocumentServiceRequest request, HttpResponseStatus status, HttpHeaders headers, String body,
Expand Down Expand Up @@ -498,4 +442,4 @@ private void applySessionToken(RxDocumentServiceRequest request) {
headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.util.Map;
Expand Down Expand Up @@ -105,13 +106,14 @@ private Mono<DocumentCollection> resolveByPartitionKeyRangeIdentityAsync(Partiti
// which contains value "<collectionrid>,<partitionkeyrangeid>", then resolve to collection rid in this header.
if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) {
return this.resolveByRidAsync(partitionKeyRangeIdentity.getCollectionRid(), properties)
.onErrorResume(e -> {
if (e instanceof NotFoundException) {
.onErrorResume(t -> {
Throwable unwrappedException = Exceptions.unwrap(t);
if (unwrappedException instanceof NotFoundException) {
// This is signal to the upper logic either to refresh
// collection cache and retry.
return Mono.error(new InvalidPartitionException(RMResources.InvalidDocumentCollection));
}
return Mono.error(e);
return Mono.error(unwrappedException);

});
}
Expand Down Expand Up @@ -165,7 +167,7 @@ private Mono<Void> refreshAsync(RxDocumentServiceRequest request) {
});
}).then();
} else {
// In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
// In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
// need to refresh unconditionally.
mono = Mono.fromRunnable(() -> this.refresh(request.getResourceAddress(), request.properties));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.collections4.ComparatorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -164,7 +165,8 @@ Mono<StoreResponse> writePrivateAsync(
.doOnError(
t -> {
try {
CosmosClientException ex = Utils.as(t, CosmosClientException.class);
Throwable unwrappedException = Exceptions.unwrap(t);
CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class);
try {
BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ErrorUtils {
private static final Logger logger = LoggerFactory.getLogger(ErrorUtils.class);

static Mono<String> getErrorResponseAsync(HttpResponse responseMessage, HttpRequest request) {
Mono<String> responseAsString = ResponseUtils.toString(responseMessage.body());
Mono<String> responseAsString = responseMessage.bodyAsString();
if (request.httpMethod() == HttpMethod.DELETE) {
return Mono.just(StringUtils.EMPTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,14 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
}

return addresses;
}).onErrorResume(ex -> {
CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(ex, CosmosClientException.class);
}).onErrorResume(throwable -> {
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(unwrappedException, CosmosClientException.class);
if (dce == null) {
if (forceRefreshPartitionAddressesModified) {
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
}
return Mono.error(ex);
return Mono.error(unwrappedException);
} else {
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) ||
Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) ||
Expand All @@ -217,7 +218,7 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
return null;
}
return Mono.error(ex);
return Mono.error(unwrappedException);
}

});
Expand Down
Loading