Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e3f011f
Fixing exception handling for Gone and Request timeouts
FabianMeiswinkel Sep 25, 2020
758a859
Adding basic unit tests
FabianMeiswinkel Sep 25, 2020
f0c6809
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Sep 25, 2020
07ac3ba
Really enforcing AddressRefresh after failing Gone for writes
FabianMeiswinkel Sep 29, 2020
585dea7
Fixing unit test regression
FabianMeiswinkel Sep 29, 2020
9f985dc
Changing log-level to warn if we don't retry
FabianMeiswinkel Sep 29, 2020
21ff088
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Sep 29, 2020
31c6035
Fixing VI failure unrelated to my changes
FabianMeiswinkel Sep 29, 2020
b3368ad
Update log4j2.properties
FabianMeiswinkel Sep 29, 2020
0ac49b1
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Sep 29, 2020
97f9527
Fixing unit test flakiness
FabianMeiswinkel Sep 30, 2020
157db1c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Sep 30, 2020
9cefd30
Merging with Mo's Diagnostics PR
FabianMeiswinkel Sep 30, 2020
41ecf26
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Sep 30, 2020
45cfea1
Moving the address refresh to a background task for writes hitting 410
FabianMeiswinkel Sep 30, 2020
6ad514f
Addressed Kushagra's PR comments
FabianMeiswinkel Oct 1, 2020
4fac63e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Oct 1, 2020
0e14727
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Oct 1, 2020
6f0b142
Addressing benchmark flakiness introduced due to not retrying writes …
FabianMeiswinkel Oct 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ final class Utility {
private final static Logger LOGGER = LoggerFactory.getLogger(Main.class);

public static void traceInformation(String payload) {
System.out.println(payload);
LOGGER.info(payload);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This is the log4j configuration for benchmarks
# Set root logger level to INFO and its default appender to be 'STDOUT'.
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = STDOUT

# Uncomment here and lines 21 - 25 to enable logging to a file as well.
#rootLogger.appenderRef.logFile.ref = FILE

property.logDirectory = $${sys:azure.cosmos.logger.directory}
property.hostName = $${sys:azure.cosmos.hostname}

logger.netty.name = io.netty
logger.netty.level = off

# STDOUT is a ConsoleAppender and uses PatternLayout.
appender.console.name = STDOUT
appender.console.type = Console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d %5X{pid} [%t] %-5p %c - %m%n

#appender.logfile.name = FILE
#appender.logfile.type = File
#appender.logfile.filename = ${logDirectory}/azure-cosmos-dotnet-benchmark.log
#appender.logfile.layout.type = PatternLayout
#appender.logfile.layout.pattern = [%d][%p][${hostName}][thread:%t][logger:%c] %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ public static <E extends CosmosException> E setPartitionKeyRangeId(E e, String p
return e;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> boolean hasSendingRequestStarted(E e) {
return e.hasSendingRequestStarted();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> void setSendingRequestStarted(E e, boolean hasSendingRequestStarted) {
e.setSendingRequestHasStarted(hasSendingRequestStarted);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static boolean isEnableMultipleWriteLocations(DatabaseAccount account) {
return account.getEnableMultipleWriteLocations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,14 @@ public class CosmosException extends AzureException {
private int rntbdPendingRequestQueueSize;
private int rntbdRequestLength;
private int rntbdResponseLength;
private boolean sendingRequestHasStarted;

protected CosmosException(int statusCode, String message, Map<String, String> responseHeaders, Throwable cause) {
super(message, cause);
this.statusCode = statusCode;
this.responseHeaders = responseHeaders == null ? new HashMap<>() : new HashMap<>(responseHeaders);
}

/**
* Creates a new instance of the CosmosException class.
*
* @param statusCode the http status code of the response.
*/
CosmosException(int statusCode) {
this(statusCode, null, null, null);
}

/**
* Creates a new instance of the CosmosException class.
*
Expand Down Expand Up @@ -126,6 +118,26 @@ protected CosmosException(String resourceAddress,
this.cosmosError = cosmosErrorResource;
}

/**
* Creates a new instance of the CosmosException class.
*
* @param resourceAddress the address of the resource the request is associated with.
* @param statusCode the http status code of the response.
* @param cosmosErrorResource the error resource object.
* @param responseHeaders the response headers.
* @param cause the inner exception
*/

protected CosmosException(String resourceAddress,
int statusCode,
CosmosError cosmosErrorResource,
Map<String, String> responseHeaders,
Throwable cause) {
this(statusCode, cosmosErrorResource == null ? null : cosmosErrorResource.getMessage(), responseHeaders, cause);
this.resourceAddress = resourceAddress;
this.cosmosError = cosmosErrorResource;
}

/**
* Creates a new instance of the CosmosException class.
*
Expand Down Expand Up @@ -349,6 +361,14 @@ int getRequestPayloadLength() {
return this.requestPayloadLength;
}

boolean hasSendingRequestStarted() {
return this.sendingRequestHasStarted;
}

void setSendingRequestHasStarted(boolean hasSendingRequestStarted) {
this.sendingRequestHasStarted = hasSendingRequestStarted;
}

int getRntbdChannelTaskQueueSize() {
return this.rntbdChannelTaskQueueSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -51,15 +52,22 @@ static public <T> Flux<T> fluxExecuteRetry(Callable<Flux<T>> callbackMethod, IRe
}

static public <T> Mono<T> executeAsync(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod, IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
Duration minBackoffForInBackoffCallback,
RxDocumentServiceRequest request) {
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod, IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
Duration minBackoffForInBackoffCallback,
RxDocumentServiceRequest request,
AddressSelector addressSelector) {

return Mono.defer(() -> {
// TODO: is defer required?
return callbackMethod.apply(InitialArgumentValuePolicyArg).onErrorResume(
RetryUtils.toRetryWithAlternateFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, request));
RetryUtils.toRetryWithAlternateFunc(
callbackMethod,
retryPolicy,
inBackoffAlternateCallbackMethod,
minBackoffForInBackoffCallback,
request,
addressSelector));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.http.HttpHeaders;

import java.net.SocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -48,6 +49,22 @@ public GoneException(CosmosError cosmosError, long lsn, String partitionKeyRange
BridgeInternal.setPartitionKeyRangeId(this, partitionKeyRangeId);
}

/**
* Instantiates a new Gone exception.
*
* @param cosmosError the cosmos error
* @param lsn the lsn
* @param partitionKeyRangeId the partition key range id
* @param responseHeaders the response headers
*
*/
public GoneException(String resourceAddress, CosmosError cosmosError, long lsn, String partitionKeyRangeId,
Map<String, String> responseHeaders, Throwable cause) {
super(resourceAddress, HttpConstants.StatusCodes.GONE, cosmosError, responseHeaders, cause);
BridgeInternal.setLSN(this, lsn);
BridgeInternal.setPartitionKeyRangeId(this, partitionKeyRangeId);
}

/**
* Instantiates a new Gone exception.
*
Expand All @@ -58,17 +75,15 @@ public GoneException(String message, String requestUri) {
this(message, null, new HashMap<>(), requestUri);
}

GoneException(String message,
Exception innerException,
URI requestUri,
String localIpAddress) {
this(message(localIpAddress, message), innerException, null, requestUri);
}

GoneException(Exception innerException) {
this(RMResources.Gone, innerException, new HashMap<>(), null);
}

// Used via reflection from unit tests
GoneException(String message, HttpHeaders headers, String requestUriString) {
super(message, null, HttpUtils.asMap(headers), HttpConstants.StatusCodes.GONE, requestUriString);
}

/**
* Instantiates a new Gone exception.
*
Expand All @@ -82,8 +97,20 @@ public GoneException(String message, HttpHeaders headers, URI requestUrl) {
: null);
}

GoneException(String message, HttpHeaders headers, String requestUriString) {
super(message, null, HttpUtils.asMap(headers), HttpConstants.StatusCodes.GONE, requestUriString);
/**
* Instantiates a new Gone exception.
*
* @param message the message
* @param headers the headers
* @param remoteAddress the remote address
*/
public GoneException(String message, HttpHeaders headers, SocketAddress remoteAddress) {
super(
message,
null,
HttpUtils.asMap(headers),
HttpConstants.StatusCodes.GONE,
remoteAddress != null ? remoteAddress.toString() : null);
}

/**
Expand Down Expand Up @@ -121,19 +148,4 @@ public GoneException(String message,
String requestUriString) {
super(message, innerException, headers, HttpConstants.StatusCodes.GONE, requestUriString);
}

GoneException(CosmosError cosmosError, Map<String, String> headers) {
super(HttpConstants.StatusCodes.GONE, cosmosError, headers);
}

private static String message(String localIP, String baseMessage) {
if (!Strings.isNullOrEmpty(localIP)) {
return String.format(
RMResources.ExceptionMessageAddIpAddress,
baseMessage,
localIP);
}

return baseMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public static ShouldRetryResult noRetry() {
return new ShouldRetryResult(null, null, false, null);
}

public static ShouldRetryResult noRetry(Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
return new ShouldRetryResult(
null,
null,
false,
policyArg);
}

public void throwIfDoneTrying(Exception capturedException) throws Exception {
if (this.shouldRetry) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,6 @@ public RequestTimeoutException(String message, URI requestUri) {
this(message, null, null, requestUri);
}

RequestTimeoutException(String message,
Exception innerException,
URI requestUri,
String localIpAddress) {
this(message(localIpAddress, message), innerException, null, requestUri);
}

RequestTimeoutException(Exception innerException) {
this(RMResources.Gone, innerException, (HttpHeaders) null, null);
}

/**
* Instantiates a new Request timeout exception.
*
Expand Down Expand Up @@ -94,6 +83,7 @@ public RequestTimeoutException(String message, HttpHeaders headers, SocketAddres
: null);
}

// Used via reflection from unit tests
RequestTimeoutException(String message, HttpHeaders headers, String requestUriString) {
super(message, null, HttpUtils.asMap(headers), HttpConstants.StatusCodes.REQUEST_TIMEOUT, requestUriString);
}
Expand All @@ -105,15 +95,4 @@ public RequestTimeoutException(String message, HttpHeaders headers, SocketAddres
super(message, innerException, HttpUtils.asMap(headers), HttpConstants.StatusCodes.REQUEST_TIMEOUT,
requestUrl != null ? requestUrl.toString() : null);
}

private static String message(String localIP, String baseMessage) {
if (!Strings.isNullOrEmpty(localIP)) {
return String.format(
RMResources.ExceptionMessageAddIpAddress,
baseMessage,
localIP);
}

return baseMessage;
}
}
Loading