From e3f011f8a38afa4585b2277a4a9d1da80162dfde Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Fri, 25 Sep 2020 22:13:29 +0000 Subject: [PATCH 01/12] Fixing exception handling for Gone and Request timeouts --- .../cosmos/dotnet/benchmark/Utility.java | 1 - .../src/main/resources/log4j2.properties | 25 ++++++++ .../java/com/azure/cosmos/BridgeInternal.java | 10 ++++ .../com/azure/cosmos/CosmosException.java | 38 +++++++++--- .../cosmos/implementation/GoneException.java | 60 +++++++++++-------- .../RequestTimeoutException.java | 23 +------ .../RxDocumentServiceRequest.java | 4 ++ .../GoneAndRetryWithRetryPolicy.java | 19 +++++- .../RntbdTransportClient.java | 3 +- .../rntbd/FailFastRntbdRequestRecord.java | 12 +++- .../rntbd/RntbdRequestManager.java | 9 ++- .../rntbd/RntbdRequestRecord.java | 18 +++++- 12 files changed, 155 insertions(+), 67 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties diff --git a/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/java/com/azure/cosmos/dotnet/benchmark/Utility.java b/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/java/com/azure/cosmos/dotnet/benchmark/Utility.java index cb08e7c7b994..c92e3cfa94aa 100644 --- a/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/java/com/azure/cosmos/dotnet/benchmark/Utility.java +++ b/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/java/com/azure/cosmos/dotnet/benchmark/Utility.java @@ -10,7 +10,6 @@ final class Utility { private final static Logger LOGGER = LoggerFactory.getLogger(Main.class); public static void traceInformation(String payload) { - System.out.println(payload); LOGGER.info(payload); } } \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties b/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties new file mode 100644 index 000000000000..5955fd5703b6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +# This is the log4j configuration for benchmarks +# Set root logger level to INFO and its default appender to be 'STDOUT'. +rootLogger.level = info +rootLogger.appenderRef.stdout.ref = STDOUT + +# Uncomment here and lines 21 - 25 to enable logging to a file as well. +#rootLogger.appenderRef.logFile.ref = FILE + +property.logDirectory = $${sys:azure.cosmos.logger.directory} +property.hostName = $${sys:azure.cosmos.hostname} + +logger.netty.name = io.netty +logger.netty.level = off + +# STDOUT is a ConsoleAppender and uses PatternLayout. +appender.console.name = STDOUT +appender.console.type = Console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d %5X{pid} [%t] %-5p %c - %m%n + +#appender.logfile.name = FILE +#appender.logfile.type = File +#appender.logfile.filename = ${logDirectory}/azure-cosmos-benchmark.log +#appender.logfile.layout.type = PatternLayout +#appender.logfile.layout.pattern = [%d][%p][${hostName}][thread:%t][logger:%c] %m%n diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java index 7954b4891fcc..8a4c40432390 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java @@ -210,6 +210,16 @@ public static E setPartitionKeyRangeId(E e, String p return e; } + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static boolean hasSendingRequestStarted(E e) { + return e.hasSendingRequestStarted(); + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static void setSendingRequestStarted(E e, boolean hasSendingRequestStarted) { + e.setSendingRequestHasStarted(hasSendingRequestStarted); + } + @Warning(value = INTERNAL_USE_ONLY_WARNING) public static boolean isEnableMultipleWriteLocations(DatabaseAccount account) { return account.getEnableMultipleWriteLocations(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java index 4432fd28a788..0318628ede12 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java @@ -55,6 +55,7 @@ public class CosmosException extends AzureException { private int requestPayloadLength; private int rntbdRequestLength; private int rntbdResponseLength; + private boolean sendingRequestHasStarted; protected CosmosException(int statusCode, String message, Map responseHeaders, Throwable cause) { super(message, cause); @@ -62,15 +63,6 @@ protected CosmosException(int statusCode, String message, Map re this.responseHeaders = responseHeaders == null ? new HashMap<>() : new HashMap<>(responseHeaders); } - /** - * Creates a new instance of the CosmosException class. - * - * @param statusCode the http status code of the response. - */ - CosmosException(int statusCode) { - this(statusCode, null, null, null); - } - /** * Creates a new instance of the CosmosException class. * @@ -122,6 +114,26 @@ protected CosmosException(String resourceAddress, this.cosmosError = cosmosErrorResource; } + /** + * Creates a new instance of the CosmosException class. + * + * @param resourceAddress the address of the resource the request is associated with. + * @param statusCode the http status code of the response. + * @param cosmosErrorResource the error resource object. + * @param responseHeaders the response headers. + * @param cause the inner exception + */ + + protected CosmosException(String resourceAddress, + int statusCode, + CosmosError cosmosErrorResource, + Map responseHeaders, + Throwable cause) { + this(statusCode, cosmosErrorResource == null ? null : cosmosErrorResource.getMessage(), responseHeaders, cause); + this.resourceAddress = resourceAddress; + this.cosmosError = cosmosErrorResource; + } + /** * Creates a new instance of the CosmosException class. * @@ -336,4 +348,12 @@ void setRequestPayloadLength(int requestBodyLength) { int getRequestPayloadLength() { return this.requestPayloadLength; } + + boolean hasSendingRequestStarted() { + return this.sendingRequestHasStarted; + } + + void setSendingRequestHasStarted(boolean hasSendingRequestStarted) { + this.sendingRequestHasStarted = hasSendingRequestStarted; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java index 4f210ec3139d..ddac13e48205 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.directconnectivity.HttpUtils; import com.azure.cosmos.implementation.http.HttpHeaders; +import java.net.SocketAddress; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -48,6 +49,22 @@ public GoneException(CosmosError cosmosError, long lsn, String partitionKeyRange BridgeInternal.setPartitionKeyRangeId(this, partitionKeyRangeId); } + /** + * Instantiates a new Gone exception. + * + * @param cosmosError the cosmos error + * @param lsn the lsn + * @param partitionKeyRangeId the partition key range id + * @param responseHeaders the response headers + * + */ + public GoneException(String resourceAddress, CosmosError cosmosError, long lsn, String partitionKeyRangeId, + Map responseHeaders, Throwable cause) { + super(resourceAddress, HttpConstants.StatusCodes.GONE, cosmosError, responseHeaders, cause); + BridgeInternal.setLSN(this, lsn); + BridgeInternal.setPartitionKeyRangeId(this, partitionKeyRangeId); + } + /** * Instantiates a new Gone exception. * @@ -58,17 +75,15 @@ public GoneException(String message, String requestUri) { this(message, null, new HashMap<>(), requestUri); } - GoneException(String message, - Exception innerException, - URI requestUri, - String localIpAddress) { - this(message(localIpAddress, message), innerException, null, requestUri); - } - GoneException(Exception innerException) { this(RMResources.Gone, innerException, new HashMap<>(), null); } + // Used via reflection from unit tests + GoneException(String message, HttpHeaders headers, String requestUriString) { + super(message, null, HttpUtils.asMap(headers), HttpConstants.StatusCodes.GONE, requestUriString); + } + /** * Instantiates a new Gone exception. * @@ -82,8 +97,20 @@ public GoneException(String message, HttpHeaders headers, URI requestUrl) { : null); } - GoneException(String message, HttpHeaders headers, String requestUriString) { - super(message, null, HttpUtils.asMap(headers), HttpConstants.StatusCodes.GONE, requestUriString); + /** + * Instantiates a new Gone exception. + * + * @param message the message + * @param headers the headers + * @param remoteAddress the remote address + */ + public GoneException(String message, HttpHeaders headers, SocketAddress remoteAddress) { + super( + message, + null, + HttpUtils.asMap(headers), + HttpConstants.StatusCodes.GONE, + remoteAddress != null ? remoteAddress.toString() : null); } /** @@ -121,19 +148,4 @@ public GoneException(String message, String requestUriString) { super(message, innerException, headers, HttpConstants.StatusCodes.GONE, requestUriString); } - - GoneException(CosmosError cosmosError, Map headers) { - super(HttpConstants.StatusCodes.GONE, cosmosError, headers); - } - - private static String message(String localIP, String baseMessage) { - if (!Strings.isNullOrEmpty(localIP)) { - return String.format( - RMResources.ExceptionMessageAddIpAddress, - baseMessage, - localIP); - } - - return baseMessage; - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeoutException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeoutException.java index f18c75bb61dc..510781c74b59 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeoutException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeoutException.java @@ -49,17 +49,6 @@ public RequestTimeoutException(String message, URI requestUri) { this(message, null, null, requestUri); } - RequestTimeoutException(String message, - Exception innerException, - URI requestUri, - String localIpAddress) { - this(message(localIpAddress, message), innerException, null, requestUri); - } - - RequestTimeoutException(Exception innerException) { - this(RMResources.Gone, innerException, (HttpHeaders) null, null); - } - /** * Instantiates a new Request timeout exception. * @@ -94,6 +83,7 @@ public RequestTimeoutException(String message, HttpHeaders headers, SocketAddres : null); } + // Used via reflection from unit tests RequestTimeoutException(String message, HttpHeaders headers, String requestUriString) { super(message, null, HttpUtils.asMap(headers), HttpConstants.StatusCodes.REQUEST_TIMEOUT, requestUriString); } @@ -105,15 +95,4 @@ public RequestTimeoutException(String message, HttpHeaders headers, SocketAddres super(message, innerException, HttpUtils.asMap(headers), HttpConstants.StatusCodes.REQUEST_TIMEOUT, requestUrl != null ? requestUrl.toString() : null); } - - private static String message(String localIP, String baseMessage) { - if (!Strings.isNullOrEmpty(localIP)) { - return String.format( - RMResources.ExceptionMessageAddIpAddress, - baseMessage, - localIP); - } - - return baseMessage; - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index d797b69ec387..bfe7f00577dd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -83,6 +83,10 @@ public boolean isReadOnlyScript() { } } + public boolean isReadOnly() { + return this.isReadOnlyRequest() || this.isReadOnlyScript(); + } + /** * @param operationType the operation type. * @param resourceIdOrFullName the request id or full name. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index f0f65b61be3a..07963563e1af 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -60,12 +60,25 @@ public Mono shouldRetry(Exception exception) { !(exception instanceof RetryWithException) && !(exception instanceof PartitionIsMigratingException) && !(exception instanceof InvalidPartitionException && - (this.request.getPartitionKeyRangeIdentity() == null || - this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) && + (this.request.getPartitionKeyRangeIdentity() == null || + this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) && !(exception instanceof PartitionKeyRangeIsSplittingException)) { + logger.debug("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount, - exception); + exception); + stopStopWatch(this.durationTimer); + return Mono.just(ShouldRetryResult.noRetry()); + } else if (exception instanceof GoneException && + !request.isReadOnly() && + BridgeInternal.hasSendingRequestStarted((CosmosException)exception)) { + + logger.debug( + "Operation will NOT be retried. Write operations can not be retried safely when sending the request " + + "to the service because they aren't idempotent. Current attempt {}, Exception: ", + this.attemptCount, + exception); stopStopWatch(this.durationTimer); + this.request.requestContext.forceRefreshAddressCache = true; return Mono.just(ShouldRetryResult.noRetry()); } else if (exception instanceof RetryWithException) { this.lastRetryWithException = (RetryWithException) exception; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index b32252d953d0..d627d8128820 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -196,6 +196,7 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume BridgeInternal.setRntbdResponseLength(cosmosException, record.responseLength()); BridgeInternal.setRequestBodyLength(cosmosException, request.getContentLength()); BridgeInternal.setRequestTimeline(cosmosException, record.takeTimelineSnapshot()); + BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted()); return cosmosException; }); @@ -215,7 +216,7 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume // messages due to CompletionException errors. Anecdotal evidence shows that this is more likely to be seen // in low latency environments on Azure cloud. To avoid the onErrorDropped events to get logged in the // default hook (which logs with level ERROR) we inject a local hook in the Reactor Context to just log it - // as DEBUG level fro the lifecycle of this Mono (safe here because we know the onErrorDropped doesn't have + // as DEBUG level for the lifecycle of this Mono (safe here because we know the onErrorDropped doesn't have // any functional issues. // // One might be tempted to complete a pending request here, but that is ill advised. Testing and diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/FailFastRntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/FailFastRntbdRequestRecord.java index deddb2420142..ce894089e1ae 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/FailFastRntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/FailFastRntbdRequestRecord.java @@ -4,6 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.RequestTimeoutException; import com.azure.cosmos.implementation.http.HttpHeaders; @@ -49,16 +50,21 @@ public static FailFastRntbdRequestRecord createAndFailFast( concurrentRequestsSnapshot); final HttpHeaders headers = new HttpHeaders(); headers.set(HttpConstants.HttpHeaders.ACTIVITY_ID, failFastRecord.activityId().toString()); - final RequestTimeoutException requestTimeoutException = new RequestTimeoutException( + + // When admission control blocks a request due to excessive pendingAcquisition queue length + // the error should be handled upstream as a transient connectivity error for which we know + // the request was never flushed to the wire - which means retries are functionally safe for both + // reads and writes + final GoneException admissionControlBlocksRequestException = new GoneException( reason, headers, remoteAddress); - BridgeInternal.setRequestHeaders(requestTimeoutException, args.serviceRequest().getHeaders()); + BridgeInternal.setRequestHeaders(admissionControlBlocksRequestException, args.serviceRequest().getHeaders()); failFastRecord.whenComplete((response, error) -> { metrics.markComplete(failFastRecord); }); - failFastRecord.completeExceptionally(requestTimeoutException); + failFastRecord.completeExceptionally(admissionControlBlocksRequestException); return failFastRecord; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index 06400a260ce7..7ae5759eb678 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -492,6 +492,7 @@ public void write(final ChannelHandlerContext context, final Object message, fin final RntbdRequestRecord record = (RntbdRequestRecord) message; this.timestamps.channelWriteAttempted(); + record.setSendingRequestHasStarted(); context.write(this.addPendingRequestRecord(context, record), promise).addListener(completed -> { record.stage(RntbdRequestRecord.Stage.SENT); @@ -670,7 +671,7 @@ private void completeAllPendingRequestsExceptionally( final Map requestHeaders = record.args().serviceRequest().getHeaders(); final String requestUri = record.args().physicalAddress().toString(); - final GoneException error = new GoneException(message, cause, (Map) null, requestUri); + final GoneException error = new GoneException(message, cause, null, requestUri); BridgeInternal.setRequestHeaders(error, requestHeaders); record.completeExceptionally(error); @@ -798,7 +799,11 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes break; case StatusCodes.REQUEST_TIMEOUT: - cause = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders); + Exception inner = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders); + String resourceAddress = requestRecord.args().physicalAddress() != null ? + requestRecord.args().physicalAddress().toString() : null; + + cause = new GoneException(resourceAddress, error, lsn, partitionKeyRangeId, responseHeaders, inner); break; case StatusCodes.RETRY_WITH: diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java index d749e9146036..7b0911438dad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java @@ -4,8 +4,8 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.RequestTimeline; -import com.azure.cosmos.implementation.RequestTimeoutException; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; @@ -58,6 +58,7 @@ public abstract class RntbdRequestRecord extends CompletableFuture Date: Fri, 25 Sep 2020 22:48:09 +0000 Subject: [PATCH 02/12] Adding basic unit tests --- .../GoneAndRetryWithRetryPolicyTest.java | 114 +++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java index 895a6d5fc865..5704466c3f80 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.implementation.BadRequestException; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.GoneException; @@ -12,8 +13,10 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.IRetryPolicy; import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.RequestTimeoutException; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.guava25.base.Supplier; import org.testng.annotations.Test; import reactor.core.publisher.Mono; @@ -27,11 +30,11 @@ public class GoneAndRetryWithRetryPolicyTest { protected static final int TIMEOUT = 60000; /** - * Retry with GoneException , retried 4 times and verified the returned + * Retry with GoneException for read, retried 4 times and verified the returned * shouldRetryResult. ShouldRetryResult */ @Test(groups = { "unit" }, timeOut = TIMEOUT) - public void shouldRetryWithGoneException() { + public void shouldRetryReadWithGoneException() { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); Mono singleShouldRetry = goneAndRetryWithRetryPolicy @@ -62,7 +65,114 @@ public void shouldRetryWithGoneException() { assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(4); assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(4); + } + + /** + * Retry with GoneException for write which is not yet sent to the wire, + * retried 4 times and verified the returned + * shouldRetryResult. ShouldRetryResult + */ + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldRetryNotYetFlushedWriteWithGoneException() { + RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document); + GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + + Supplier goneExceptionForNotYetFlushedRequestSupplier = () -> { + GoneException goneExceptionForNotYetFlushedRequest = new GoneException(); + BridgeInternal.setSendingRequestStarted(goneExceptionForNotYetFlushedRequest, false); + + return goneExceptionForNotYetFlushedRequest; + }; + Mono singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(goneExceptionForNotYetFlushedRequestSupplier.get()); + IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); + assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(0); + + singleShouldRetry = goneAndRetryWithRetryPolicy.shouldRetry(goneExceptionForNotYetFlushedRequestSupplier.get()); + shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(2); + assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(1); + + singleShouldRetry = goneAndRetryWithRetryPolicy.shouldRetry(goneExceptionForNotYetFlushedRequestSupplier.get()); + shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(3); + assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(2); + + singleShouldRetry = goneAndRetryWithRetryPolicy.shouldRetry(goneExceptionForNotYetFlushedRequestSupplier.get()); + shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(4); + assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(4); + } + + /** + * GoneException for write which is already sent to the wire, should not result in retry, + * but the request context's forceRefreshAddressCache flag should still be set to true + * shouldRetryResult. ShouldRetryResult + */ + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldNotRetryFlushedWriteWithGoneExceptionButForceAddressRefresh() { + RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document); + assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); + + Supplier goneExceptionForFlushedRequestSupplier = () -> { + GoneException goneExceptionForFlushedRequest = new GoneException(); + BridgeInternal.setSendingRequestStarted(goneExceptionForFlushedRequest, true); + + return goneExceptionForFlushedRequest; + }; + + GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + Mono singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(goneExceptionForFlushedRequestSupplier.get()); + IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(request.requestContext.forceRefreshAddressCache).isTrue(); + assertThat(shouldRetryResult.policyArg).isNull(); + assertThat(shouldRetryResult.backOffTime).isNull(); + } + + /** + * RequestTimeoutExceptions should not be retried for read or write - no address cache refresh expected + * shouldRetryResult. ShouldRetryResult + */ + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldNotRetryRequestTimeoutException() { + RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document); + assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); + + GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + Mono singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(new RequestTimeoutException()); + IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); + assertThat(shouldRetryResult.policyArg).isNull(); + assertThat(shouldRetryResult.backOffTime).isNull(); + + request = RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document); + assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); + + goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(new RequestTimeoutException()); + shouldRetryResult = singleShouldRetry.block(); + + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); + assertThat(shouldRetryResult.policyArg).isNull(); + assertThat(shouldRetryResult.backOffTime).isNull(); } /** From 07ac3ba6285abd909294cd64910c24da2f94baa0 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 29 Sep 2020 01:27:29 +0000 Subject: [PATCH 03/12] Really enforcing AddressRefresh after failing Gone for writes --- .../implementation/BackoffRetryUtility.java | 18 ++- .../cosmos/implementation/IRetryPolicy.java | 8 ++ .../cosmos/implementation/RetryUtils.java | 56 ++++++--- .../GoneAndRetryWithRetryPolicy.java | 9 +- .../ReplicatedResourceClient.java | 11 +- .../cosmos/RetryContextOnDiagnosticTest.java | 21 +++- .../cosmos/implementation/RetryUtilsTest.java | 9 +- .../AddressSelectorWrapper.java | 25 +++-- ...licatedResourceClientGoneForWriteTest.java | 106 ++++++++++++++++++ 9 files changed, 223 insertions(+), 40 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java index 6a9d496ccbc8..9426daa4422e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BackoffRetryUtility.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -51,15 +52,22 @@ static public Flux fluxExecuteRetry(Callable> callbackMethod, IRe } static public Mono executeAsync( - Function, Mono> callbackMethod, IRetryPolicy retryPolicy, - Function, Mono> inBackoffAlternateCallbackMethod, - Duration minBackoffForInBackoffCallback, - RxDocumentServiceRequest request) { + Function, Mono> callbackMethod, IRetryPolicy retryPolicy, + Function, Mono> inBackoffAlternateCallbackMethod, + Duration minBackoffForInBackoffCallback, + RxDocumentServiceRequest request, + AddressSelector addressSelector) { return Mono.defer(() -> { // TODO: is defer required? return callbackMethod.apply(InitialArgumentValuePolicyArg).onErrorResume( - RetryUtils.toRetryWithAlternateFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, request)); + RetryUtils.toRetryWithAlternateFunc( + callbackMethod, + retryPolicy, + inBackoffAlternateCallbackMethod, + minBackoffForInBackoffCallback, + request, + addressSelector)); }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java index 26aee94303ba..741f59b6c381 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java @@ -81,6 +81,14 @@ public static ShouldRetryResult noRetry() { return new ShouldRetryResult(null, null, false, null); } + public static ShouldRetryResult noRetry(Quadruple policyArg) { + return new ShouldRetryResult( + null, + null, + false, + policyArg); + } + public void throwIfDoneTrying(Exception capturedException) throws Exception { if (this.shouldRetry) { return; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java index 46bf6bda1236..200bae6d056a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java @@ -4,6 +4,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.apachecommons.lang.time.StopWatch; +import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -61,7 +62,8 @@ public static Function> toRetryWithAlternateFunc(Function IRetryPolicy retryPolicy, Function, Mono> inBackoffAlternateCallbackMethod, Duration minBackoffForInBackoffCallback, - RxDocumentServiceRequest rxDocumentServiceRequest) { + RxDocumentServiceRequest rxDocumentServiceRequest, + AddressSelector addressSelector) { return throwable -> { if(rxDocumentServiceRequest.requestContext != null && retryPolicy.getRetryCount() > 0) { retryPolicy.updateEndTime(); @@ -82,11 +84,20 @@ public static Function> toRetryWithAlternateFunc(Function if (!shouldRetryResult.shouldRetry) { retryPolicy.updateEndTime(); - if(shouldRetryResult.exception == null) { - return Mono.error(e); - } else { - return Mono.error(shouldRetryResult.exception); + + final Throwable errorToReturn = shouldRetryResult.exception != null ? shouldRetryResult.exception : e; + final Mono failure = Mono.error(errorToReturn); + + if (shouldRetryResult.policyArg != null && + shouldRetryResult.policyArg.getValue0() != null && + shouldRetryResult.policyArg.getValue0()) { + + return (Mono)addressSelector.resolveAddressesAsync( + rxDocumentServiceRequest, + true).flatMap((dummy) -> failure); } + + return failure; } retryPolicy.incrementRetry(); if(rxDocumentServiceRequest.requestContext != null && retryPolicy.getRetryCount() > 0) { @@ -99,24 +110,41 @@ public static Function> toRetryWithAlternateFunc(Function StopWatch stopwatch = new StopWatch(); startStopWatch(stopwatch); return inBackoffAlternateCallbackMethod.apply(shouldRetryResult.policyArg) - .onErrorResume(recurrsiveWithAlternateFunc(callbackMethod, retryPolicy, + .onErrorResume(recursiveWithAlternateFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, shouldRetryResult, stopwatch, - minBackoffForInBackoffCallback, rxDocumentServiceRequest)); + minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector)); } else { - return recurrsiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, - shouldRetryResult, minBackoffForInBackoffCallback, rxDocumentServiceRequest) + return recursiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, + shouldRetryResult, minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector) .delaySubscription(Duration.ofMillis(shouldRetryResult.backOffTime.toMillis())); } }); }; } - private static Mono recurrsiveFunc(Function, Mono> callbackMethod, IRetryPolicy retryPolicy, Function, Mono> inBackoffAlternateCallbackMethod, IRetryPolicy.ShouldRetryResult shouldRetryResult, Duration minBackoffForInBackoffCallback, RxDocumentServiceRequest rxDocumentServiceRequest) { + private static Mono recursiveFunc( + Function, Mono> callbackMethod, + IRetryPolicy retryPolicy, + Function, Mono> inBackoffAlternateCallbackMethod, + IRetryPolicy.ShouldRetryResult shouldRetryResult, + Duration minBackoffForInBackoffCallback, + RxDocumentServiceRequest rxDocumentServiceRequest, + AddressSelector addressSelector) { + return callbackMethod.apply(shouldRetryResult.policyArg).onErrorResume(toRetryWithAlternateFunc( - callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, rxDocumentServiceRequest)); + callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector)); } - private static Function> recurrsiveWithAlternateFunc(Function, Mono> callbackMethod, IRetryPolicy retryPolicy, Function, Mono> inBackoffAlternateCallbackMethod, IRetryPolicy.ShouldRetryResult shouldRetryResult, StopWatch stopwatch, Duration minBackoffForInBackoffCallback,RxDocumentServiceRequest rxDocumentServiceRequest) { + private static Function> recursiveWithAlternateFunc( + Function, Mono> callbackMethod, + IRetryPolicy retryPolicy, + Function, Mono> inBackoffAlternateCallbackMethod, + IRetryPolicy.ShouldRetryResult shouldRetryResult, + StopWatch stopwatch, + Duration minBackoffForInBackoffCallback, + RxDocumentServiceRequest rxDocumentServiceRequest, + AddressSelector addressSelector) { + return throwable -> { Exception e = Utils.as(throwable, Exception.class); if (e == null) { @@ -129,8 +157,8 @@ private static Function> recurrsiveWithAlternateFunc(Func Duration backoffTime = shouldRetryResult.backOffTime.toMillis() > stopwatch.getTime() ? Duration.ofMillis(shouldRetryResult.backOffTime.toMillis() - stopwatch.getTime()) : Duration.ZERO; - return recurrsiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, shouldRetryResult, - minBackoffForInBackoffCallback, rxDocumentServiceRequest) + return recursiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, shouldRetryResult, + minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector) .delaySubscription(Flux.just(0L).delayElements(Duration.ofMillis(backoffTime.toMillis()))); }; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 07963563e1af..0f0d04dab814 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -78,8 +78,9 @@ public Mono shouldRetry(Exception exception) { this.attemptCount, exception); stopStopWatch(this.durationTimer); - this.request.requestContext.forceRefreshAddressCache = true; - return Mono.just(ShouldRetryResult.noRetry()); + + return Mono.just(ShouldRetryResult.noRetry( + Quadruple.with(true, true, Duration.ofMillis(0), this.attemptCount))); } else if (exception instanceof RetryWithException) { this.lastRetryWithException = (RetryWithException) exception; } @@ -134,7 +135,7 @@ public Mono shouldRetry(Exception exception) { backoffTime = Duration.ofSeconds(Math.min(Math.min(this.currentBackoffSeconds, remainingSeconds), GoneAndRetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_SECONDS)); this.currentBackoffSeconds *= GoneAndRetryWithRetryPolicy.BACK_OFF_MULTIPLIER; - logger.info("BackoffTime: {} seconds.", backoffTime.getSeconds()); + logger.debug("BackoffTime: {} seconds.", backoffTime.getSeconds()); } // Calculate the remaining time based after accounting for the backoff that we @@ -143,7 +144,7 @@ public Mono shouldRetry(Exception exception) { timeout = timeoutInMillSec > 0 ? Duration.ofMillis(timeoutInMillSec) : Duration.ofSeconds(GoneAndRetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_SECONDS); if (exception instanceof GoneException) { - logger.info("Received gone exception, will retry, {}", exception.toString()); + logger.debug("Received gone exception, will retry, {}", exception.toString()); forceRefreshAddressCache = true; // indicate we are in retry. } else if (exception instanceof PartitionIsMigratingException) { logger.warn("Received PartitionIsMigratingException, will retry, {}", exception.toString()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java index bcba655226d7..b26e18bbe65d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java @@ -149,9 +149,14 @@ public Mono invokeAsync(RxDocumentServiceRequest request, ReplicatedResourceClient.STRONG_GONE_AND_RETRY_WITH_RETRY_TIMEOUT_SECONDS : ReplicatedResourceClient.GONE_AND_RETRY_WITH_TIMEOUT_IN_SECONDS; - return BackoffRetryUtility.executeAsync(funcDelegate, new GoneAndRetryWithRetryPolicy(request, retryTimeout), - inBackoffFuncDelegate, Duration.ofSeconds( - ReplicatedResourceClient.MIN_BACKOFF_FOR_FAILLING_BACK_TO_OTHER_REGIONS_FOR_READ_REQUESTS_IN_SECONDS), request); + return BackoffRetryUtility.executeAsync( + funcDelegate, + new GoneAndRetryWithRetryPolicy(request, retryTimeout), + inBackoffFuncDelegate, + Duration.ofSeconds( + ReplicatedResourceClient.MIN_BACKOFF_FOR_FAILLING_BACK_TO_OTHER_REGIONS_FOR_READ_REQUESTS_IN_SECONDS), + request, + addressSelector); } private Mono invokeAsync(RxDocumentServiceRequest request, TimeoutHelper timeout, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java index c482e3e7d9c1..cbc92eeff824 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java @@ -18,6 +18,7 @@ import com.azure.cosmos.implementation.RxDocumentServiceResponse; import com.azure.cosmos.implementation.RxStoreModel; import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.rx.TestSuiteBase; @@ -45,6 +46,7 @@ public class RetryContextOnDiagnosticTest extends TestSuiteBase { private IRetryPolicy retryPolicy; private RxDocumentServiceRequest serviceRequest; + private AddressSelector addressSelector; @Test(groups = {"simple"}) public void backoffRetryUtilityExecuteRetry() throws Exception { @@ -52,6 +54,7 @@ public void backoffRetryUtilityExecuteRetry() throws Exception { Callable> callbackMethod = Mockito.mock(Callable.class); serviceRequest = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document); retryPolicy = new TestRetryPolicy(); + addressSelector = Mockito.mock(AddressSelector.class); CosmosException exception = new CosmosException(410, exceptionText); Mockito.when(callbackMethod.call()).thenThrow(exception, exception, exception, exception, exception) .thenReturn(Mono.just(new StoreResponse(200, new ArrayList<>(), getUTF8BytesOrNull(responseText)))); @@ -69,6 +72,7 @@ public void backoffRetryUtilityExecuteRetryWithFailure() throws Exception { Callable> callbackMethod = Mockito.mock(Callable.class); serviceRequest = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document); retryPolicy = new TestRetryPolicy(); + addressSelector = Mockito.mock(AddressSelector.class); CosmosException exception = new CosmosException(410, exceptionText); Mockito.when(callbackMethod.call()).thenThrow(exception); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); @@ -89,11 +93,18 @@ public void backoffRetryUtilityExecuteAsync() { Function, Mono> parameterizedCallbackMethod = Mockito.mock(Function.class); serviceRequest = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document); retryPolicy = new TestRetryPolicy(); + addressSelector = Mockito.mock(AddressSelector.class); CosmosException exception = new CosmosException(410, exceptionText); Mono exceptionMono = Mono.error(exception); Mockito.when(parameterizedCallbackMethod.apply(Matchers.any(Quadruple.class))).thenReturn(exceptionMono, exceptionMono, exceptionMono, exceptionMono, exceptionMono) .thenReturn(Mono.just(new StoreResponse(200, new ArrayList<>(), getUTF8BytesOrNull(responseText)))); - Mono monoResponse = BackoffRetryUtility.executeAsync(parameterizedCallbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, Duration.ofSeconds(5), serviceRequest); + Mono monoResponse = BackoffRetryUtility.executeAsync( + parameterizedCallbackMethod, + retryPolicy, + inBackoffAlternateCallbackMethod, + Duration.ofSeconds(5), + serviceRequest, + addressSelector); StoreResponse response = validateSuccess(monoResponse); assertThat(serviceRequest.requestContext.retryContext.retryCount).isEqualTo(5); @@ -116,7 +127,13 @@ public void backoffRetryUtilityExecuteAsyncWithFailure() { executor.schedule(() -> { ((TestRetryPolicy) retryPolicy).noRetry = true; }, 10, TimeUnit.SECONDS); - Mono monoResponse = BackoffRetryUtility.executeAsync(parameterizedCallbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, Duration.ofSeconds(5), serviceRequest); + Mono monoResponse = BackoffRetryUtility.executeAsync( + parameterizedCallbackMethod, + retryPolicy, + inBackoffAlternateCallbackMethod, + Duration.ofSeconds(5), + serviceRequest, + addressSelector); validateFailure(monoResponse); assertThat(serviceRequest.requestContext.retryContext.retryCount).isGreaterThanOrEqualTo(5); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java index 77411d587789..87a412133e50 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.implementation.IRetryPolicy.ShouldRetryResult; +import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.StoreResponseValidator; import io.reactivex.subscribers.TestSubscriber; @@ -31,6 +32,7 @@ public class RetryUtilsTest { private static final int TIMEOUT = 30000; private static final Duration BACK_OFF_DURATION = Duration.ofMillis(20); private StoreResponse storeResponse; + private AddressSelector addressSelector; @BeforeClass(groups = { "unit" }) @SuppressWarnings({"unchecked", "rawtypes"}) @@ -40,6 +42,7 @@ public void before_RetryUtilsTest() throws Exception { callbackMethod = Mockito.mock(Function.class); inBackoffAlternateCallbackMethod = Mockito.mock(Function.class); storeResponse = getStoreResponse(); + addressSelector = Mockito.mock(AddressSelector.class); } /** @@ -49,7 +52,7 @@ public void before_RetryUtilsTest() throws Exception { @Test(groups = { "unit" }, timeOut = TIMEOUT) public void toRetryWithAlternateFuncWithNoRetry() { Function> onErrorFunc = RetryUtils.toRetryWithAlternateFunc(callbackMethod, - retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, request); + retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, request, addressSelector); Mockito.when(retryPolicy.shouldRetry(Matchers.any())).thenReturn(Mono.just(ShouldRetryResult.noRetry())); Mono response = onErrorFunc.apply(new GoneException()); validateFailure(response, TIMEOUT, GoneException.class); @@ -63,7 +66,7 @@ public void toRetryWithAlternateFuncWithNoRetry() { @Test(groups = { "unit" }, timeOut = TIMEOUT) public void toRetryWithAlternateFuncTestingMethodOne() { Function> onErrorFunc = RetryUtils.toRetryWithAlternateFunc(callbackMethod, - retryPolicy, null, minBackoffForInBackoffCallback, request); + retryPolicy, null, minBackoffForInBackoffCallback, request, addressSelector); toggleMockFuncBtwFailureSuccess(callbackMethod); Mockito.when(retryPolicy.shouldRetry(Matchers.any())) @@ -83,7 +86,7 @@ public void toRetryWithAlternateFuncTestingMethodOne() { @Test(groups = { "unit" }, timeOut = TIMEOUT) public void toRetryWithAlternateFuncTestingMethodTwo() { Function> onErrorFunc = RetryUtils.toRetryWithAlternateFunc(callbackMethod, - retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, request); + retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, request, addressSelector); Mockito.when(callbackMethod.apply(Matchers.any())).thenReturn(Mono.error(new GoneException())); toggleMockFuncBtwFailureSuccess(inBackoffAlternateCallbackMethod); Mockito.when(retryPolicy.shouldRetry(Matchers.any())) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressSelectorWrapper.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressSelectorWrapper.java index b8f06459529e..557fa15ff209 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressSelectorWrapper.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressSelectorWrapper.java @@ -287,6 +287,10 @@ public Builder(Protocol protocol) { this.protocol = protocol; } + private static AddressInformation toAddressInformation(Uri uri, boolean isPrimary, Protocol protocol) { + return new AddressInformation(true, isPrimary, uri.getURIAsString(), protocol); + } + public static class PrimaryReplicaMoveBuilder extends Builder { static PrimaryReplicaMoveBuilder create(Protocol protocol) { return new PrimaryReplicaMoveBuilder(protocol); @@ -422,7 +426,7 @@ public AddressSelectorWrapper build() { RxDocumentServiceRequest request = invocation.getArgumentAt(0, RxDocumentServiceRequest.class); boolean forceRefresh = invocation.getArgumentAt(1, Boolean.class); - ImmutableList.Builder b = ImmutableList.builder(); + ImmutableList.Builder b = ImmutableList.builder(); if (forceRefresh || refreshed.get()) { if (partitionKeyRangeFunction != null) { @@ -430,13 +434,20 @@ public AddressSelectorWrapper build() { } refreshed.set(true); - b.add(primary.getRight()); - b.addAll(secondary.stream().map(s -> s.getRight()).collect(Collectors.toList())); + b.add(toAddressInformation(primary.getRight(), true, protocol)); + b.addAll( + secondary.stream() + .map( + s -> toAddressInformation(s.getRight(), false, protocol)) + .collect(Collectors.toList())); return Mono.just(b.build()); } else { // old - b.add(primary.getLeft()); - b.addAll(secondary.stream().map(s -> s.getLeft()).collect(Collectors.toList())); + b.add(toAddressInformation(primary.getLeft(), true, protocol)); + b.addAll(secondary.stream() + .map( + s -> toAddressInformation(s.getLeft(), false, protocol)) + .collect(Collectors.toList())); return Mono.just(b.build()); } })).when(addressSelector).resolveAddressesAsync(Mockito.any(RxDocumentServiceRequest.class), Mockito.anyBoolean()); @@ -499,10 +510,6 @@ public AddressSelectorWrapper build() { return new AddressSelectorWrapper(this.addressSelector, this.invocationOnMockList); } - - private AddressInformation toAddressInformation(Uri uri, boolean isPrimary, Protocol protocol) { - return new AddressInformation(true, isPrimary, uri.getURIAsString(), protocol); - } } protected void capture(InvocationOnMock invocationOnMock) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java new file mode 100644 index 000000000000..17af8b69f86c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java @@ -0,0 +1,106 @@ +package com.azure.cosmos.implementation.directconnectivity; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.DocumentServiceRequestContext; +import com.azure.cosmos.implementation.FailureValidator; +import com.azure.cosmos.implementation.GoneException; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.IAuthorizationTokenProvider; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.SessionContainer; +import com.azure.cosmos.implementation.StoreResponseBuilder; +import org.mockito.Mockito; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +public class ReplicatedResourceClientGoneForWriteTest extends ReplicatedResourceClientPartitionSplitTest { + @DataProvider(name = "goneOnWriteRefreshesAddressesArgProvider") + public Object[][] goneOnWriteRefreshesAddressesArgProvider() { + return new Object[][]{ + // Consistency mode, number of partition splitting exception till split migration completes + { ConsistencyLevel.EVENTUAL}, + }; + } + + @Test(groups = { "unit" }, dataProvider = "goneOnWriteRefreshesAddressesArgProvider", timeOut = TIMEOUT) + public void gone_RefreshCache_Write(ConsistencyLevel consistencyLevel) { + + Uri primaryAddress = Uri.create("http://primary/"); + List secondaryAddresses = new ArrayList<>(); + secondaryAddresses.add(Uri.create("http://secondary-1/")); + secondaryAddresses.add(Uri.create("http://secondary-2/")); + secondaryAddresses.add(Uri.create("http://secondary-3/")); + + String partitionKeyRangeId = "1"; + + AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper + .Builder + .Simple + .create() + .withPrimary(primaryAddress) + .withSecondary(secondaryAddresses) + .build(); + + long lsn = 54; + long localLsn = 18; + + StoreResponse primaryResponse = StoreResponseBuilder.create() + .withLSN(lsn) + .withLocalLSN(localLsn) + .withHeader(WFConstants.BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long.toString(localLsn)) + .withRequestCharge(1.1) + .build(); + + TransportClientWrapper.Builder.UriToResultBuilder transportClientWrapperBuilder = TransportClientWrapper.Builder.uriToResultBuilder(); + + GoneException goneException = new GoneException(); + BridgeInternal.setSendingRequestStarted(goneException, true); + transportClientWrapperBuilder + .exceptionOn(primaryAddress, OperationType.Create, ResourceType.Document, goneException, true); + + TransportClientWrapper transportClientWrapper = transportClientWrapperBuilder.build(); + + GatewayServiceConfiguratorReaderMock gatewayServiceConfigurationReaderWrapper = GatewayServiceConfiguratorReaderMock.from(ConsistencyLevel.STRONG, + 4, + 3, + 4, + 3); + + SessionContainer sessionContainer = new SessionContainer("test"); + + IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); + ReplicatedResourceClient resourceClient = new ReplicatedResourceClient(new Configs(), + addressSelectorWrapper.addressSelector, + sessionContainer, + transportClientWrapper.transportClient, + gatewayServiceConfigurationReaderWrapper.gatewayServiceConfigurationReader, + authorizationTokenProvider, + false, + false); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + OperationType.Create, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + request.requestContext = new DocumentServiceRequestContext(); + request.getHeaders().put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, consistencyLevel.toString()); + + Function> prepareRequestAsyncDelegate = null; + Mono storeResponseObs = resourceClient.invokeAsync(request, prepareRequestAsyncDelegate); + + FailureValidator validator = FailureValidator + .builder() + .instanceOf(CosmosException.class) + .statusCode(410).build(); + validateFailure(storeResponseObs, validator, TIMEOUT); + addressSelectorWrapper.verifyNumberOfForceCacheRefreshGreaterThanOrEqualTo(1); + } +} From 585dea7d460a1b50b601f75121a6188ae0f21610 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 29 Sep 2020 02:01:45 +0000 Subject: [PATCH 04/12] Fixing unit test regression --- .../com/azure/cosmos/implementation/RetryUtils.java | 2 +- .../GoneAndRetryWithRetryPolicyTest.java | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java index 200bae6d056a..01d5a74eea9c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java @@ -92,7 +92,7 @@ public static Function> toRetryWithAlternateFunc(Function shouldRetryResult.policyArg.getValue0() != null && shouldRetryResult.policyArg.getValue0()) { - return (Mono)addressSelector.resolveAddressesAsync( + return addressSelector.resolveAddressesAsync( rxDocumentServiceRequest, true).flatMap((dummy) -> failure); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java index 5704466c3f80..d710b5caefd3 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java @@ -116,13 +116,12 @@ public void shouldRetryNotYetFlushedWriteWithGoneException() { /** * GoneException for write which is already sent to the wire, should not result in retry, - * but the request context's forceRefreshAddressCache flag should still be set to true + * but an address refresh should be triggered * shouldRetryResult. ShouldRetryResult */ @Test(groups = { "unit" }, timeOut = TIMEOUT) public void shouldNotRetryFlushedWriteWithGoneExceptionButForceAddressRefresh() { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document); - assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); Supplier goneExceptionForFlushedRequestSupplier = () -> { GoneException goneExceptionForFlushedRequest = new GoneException(); @@ -137,8 +136,8 @@ public void shouldNotRetryFlushedWriteWithGoneExceptionButForceAddressRefresh() IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); - assertThat(request.requestContext.forceRefreshAddressCache).isTrue(); - assertThat(shouldRetryResult.policyArg).isNull(); + assertThat(shouldRetryResult.policyArg).isNotNull(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); assertThat(shouldRetryResult.backOffTime).isNull(); } @@ -149,7 +148,6 @@ public void shouldNotRetryFlushedWriteWithGoneExceptionButForceAddressRefresh() @Test(groups = { "unit" }, timeOut = TIMEOUT) public void shouldNotRetryRequestTimeoutException() { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document); - assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); Mono singleShouldRetry = goneAndRetryWithRetryPolicy @@ -157,12 +155,10 @@ public void shouldNotRetryRequestTimeoutException() { IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); - assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); assertThat(shouldRetryResult.policyArg).isNull(); assertThat(shouldRetryResult.backOffTime).isNull(); request = RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document); - assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); singleShouldRetry = goneAndRetryWithRetryPolicy @@ -170,7 +166,6 @@ public void shouldNotRetryRequestTimeoutException() { shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); - assertThat(request.requestContext.forceRefreshAddressCache).isFalse(); assertThat(shouldRetryResult.policyArg).isNull(); assertThat(shouldRetryResult.backOffTime).isNull(); } From 9f985dc35ff2dacc963b46376722a15ec8658000 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 29 Sep 2020 10:11:24 +0000 Subject: [PATCH 05/12] Changing log-level to warn if we don't retry --- .../com/azure/cosmos/implementation/TestConfigurations.java | 2 +- .../directconnectivity/GoneAndRetryWithRetryPolicy.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TestConfigurations.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TestConfigurations.java index bb852c114816..f4e1e4bd297b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TestConfigurations.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TestConfigurations.java @@ -51,7 +51,7 @@ public final class TestConfigurations { properties.getProperty("ACCOUNT_HOST", StringUtils.defaultString(Strings.emptyToNull( System.getenv().get("ACCOUNT_HOST")), - "https://localhost:443/")); + "https://localhost:8081/")); public final static String CONSISTENCY = properties.getProperty("ACCOUNT_CONSISTENCY", diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 0f0d04dab814..6affdc17e24a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -64,7 +64,7 @@ public Mono shouldRetry(Exception exception) { this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) && !(exception instanceof PartitionKeyRangeIsSplittingException)) { - logger.debug("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount, + logger.warn("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount, exception); stopStopWatch(this.durationTimer); return Mono.just(ShouldRetryResult.noRetry()); @@ -72,7 +72,7 @@ public Mono shouldRetry(Exception exception) { !request.isReadOnly() && BridgeInternal.hasSendingRequestStarted((CosmosException)exception)) { - logger.debug( + logger.warn( "Operation will NOT be retried. Write operations can not be retried safely when sending the request " + "to the service because they aren't idempotent. Current attempt {}, Exception: ", this.attemptCount, From 31c603510f672116d4f58e0f6716959d445387b2 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 29 Sep 2020 19:30:32 +0000 Subject: [PATCH 06/12] Fixing VI failure unrelated to my changes --- .../java/com/azure/cosmos/CosmosDiagnosticsTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index 949a4a48b2ae..3dedf397aba4 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -619,10 +619,15 @@ private void validateRntbdStatistics(CosmosDiagnostics cosmosDiagnostics, // first request initialized the rntbd service endpoint assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())).isAfterOrEqualTo(beforeInitializingRntbdServiceEndpoint); - assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())).isBeforeOrEqualTo(afterInitializingRntbdServiceEndpoint); - assertThat(Instant.parse(serviceEndpointStatistics.get("lastRequestTime").asText())).isAfterOrEqualTo(beforeOperation2).isBeforeOrEqualTo(afterOperation2); - assertThat(Instant.parse(serviceEndpointStatistics.get("lastSuccessfulRequestTime").asText())).isAfterOrEqualTo(beforeOperation2).isBeforeOrEqualTo(afterOperation2); + // Adding 1 ms to cover for rounding errors (only 3 fractional digits) + Instant afterInitializationThreshold = afterInitializingRntbdServiceEndpoint.plusMillis(1); + assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())).isBeforeOrEqualTo(afterInitializationThreshold); + + // Adding 1 ms to cover for rounding errors (only 3 fractional digits) + Instant afterOperation2Threshold = afterOperation2.plusMillis(1); + assertThat(Instant.parse(serviceEndpointStatistics.get("lastRequestTime").asText())).isAfterOrEqualTo(beforeOperation2).isBeforeOrEqualTo(afterOperation2Threshold); + assertThat(Instant.parse(serviceEndpointStatistics.get("lastSuccessfulRequestTime").asText())).isAfterOrEqualTo(beforeOperation2).isBeforeOrEqualTo(afterOperation2Threshold); } private void validate(CosmosDiagnostics cosmosDiagnostics, int expectedRequestPayloadSize, int expectedResponsePayloadSize) throws Exception { From b3368ad8bca99696797bd7ae9afb909fc39828d9 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 29 Sep 2020 21:59:46 +0000 Subject: [PATCH 07/12] Update log4j2.properties --- .../src/main/resources/log4j2.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties b/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties index 5955fd5703b6..d1cbc7606e6c 100644 --- a/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties +++ b/sdk/cosmos/azure-cosmos-dotnet-benchmark/src/main/resources/log4j2.properties @@ -20,6 +20,6 @@ appender.console.layout.pattern = %d %5X{pid} [%t] %-5p %c - %m%n #appender.logfile.name = FILE #appender.logfile.type = File -#appender.logfile.filename = ${logDirectory}/azure-cosmos-benchmark.log +#appender.logfile.filename = ${logDirectory}/azure-cosmos-dotnet-benchmark.log #appender.logfile.layout.type = PatternLayout #appender.logfile.layout.pattern = [%d][%p][${hostName}][thread:%t][logger:%c] %m%n From 97f95273a025bb853a52b564cd86e9fc0acac3af Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 30 Sep 2020 18:10:02 +0000 Subject: [PATCH 08/12] Fixing unit test flakiness --- .../com/azure/cosmos/CosmosDiagnosticsTest.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index 3dedf397aba4..65658e6f1a0b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -618,16 +618,24 @@ private void validateRntbdStatistics(CosmosDiagnostics cosmosDiagnostics, assertThat(serviceEndpointStatistics.get("isClosed").asBoolean()).isEqualTo(false); // first request initialized the rntbd service endpoint - assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())).isAfterOrEqualTo(beforeInitializingRntbdServiceEndpoint); + Instant beforeInitializationThreshold = beforeInitializingRntbdServiceEndpoint.minusMillis(1); + assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())) + .isAfterOrEqualTo(beforeInitializationThreshold); // Adding 1 ms to cover for rounding errors (only 3 fractional digits) Instant afterInitializationThreshold = afterInitializingRntbdServiceEndpoint.plusMillis(1); - assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())).isBeforeOrEqualTo(afterInitializationThreshold); + assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText())) + .isBeforeOrEqualTo(afterInitializationThreshold); // Adding 1 ms to cover for rounding errors (only 3 fractional digits) Instant afterOperation2Threshold = afterOperation2.plusMillis(1); - assertThat(Instant.parse(serviceEndpointStatistics.get("lastRequestTime").asText())).isAfterOrEqualTo(beforeOperation2).isBeforeOrEqualTo(afterOperation2Threshold); - assertThat(Instant.parse(serviceEndpointStatistics.get("lastSuccessfulRequestTime").asText())).isAfterOrEqualTo(beforeOperation2).isBeforeOrEqualTo(afterOperation2Threshold); + Instant beforeOperation2Threshold = beforeOperation2.minusMillis(1); + assertThat(Instant.parse(serviceEndpointStatistics.get("lastRequestTime").asText())) + .isAfterOrEqualTo(beforeOperation2Threshold) + .isBeforeOrEqualTo(afterOperation2Threshold); + assertThat(Instant.parse(serviceEndpointStatistics.get("lastSuccessfulRequestTime").asText())) + .isAfterOrEqualTo(beforeOperation2Threshold) + .isBeforeOrEqualTo(afterOperation2Threshold); } private void validate(CosmosDiagnostics cosmosDiagnostics, int expectedRequestPayloadSize, int expectedResponsePayloadSize) throws Exception { From 9cefd308bfeec9a3e4d089adb9aa8d15d87fd747 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 30 Sep 2020 18:20:09 +0000 Subject: [PATCH 09/12] Merging with Mo's Diagnostics PR --- .../ReplicatedResourceClientGoneForWriteTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java index 17af8b69f86c..644529dd86d9 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.function.Function; +import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; + public class ReplicatedResourceClientGoneForWriteTest extends ReplicatedResourceClientPartitionSplitTest { @DataProvider(name = "goneOnWriteRefreshesAddressesArgProvider") public Object[][] goneOnWriteRefreshesAddressesArgProvider() { @@ -79,7 +81,9 @@ public void gone_RefreshCache_Write(ConsistencyLevel consistencyLevel) { SessionContainer sessionContainer = new SessionContainer("test"); IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); - ReplicatedResourceClient resourceClient = new ReplicatedResourceClient(new Configs(), + ReplicatedResourceClient resourceClient = new ReplicatedResourceClient( + mockDiagnosticsClientContext(), + new Configs(), addressSelectorWrapper.addressSelector, sessionContainer, transportClientWrapper.transportClient, @@ -89,7 +93,10 @@ public void gone_RefreshCache_Write(ConsistencyLevel consistencyLevel) { false); RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( - OperationType.Create, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + mockDiagnosticsClientContext(), + OperationType.Create, + "/dbs/db/colls/col/docs/docId", + ResourceType.Document); request.requestContext = new DocumentServiceRequestContext(); request.getHeaders().put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, consistencyLevel.toString()); From 45cfea1f34112d4f90f274453b9ca09b7ed24a14 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 30 Sep 2020 22:48:14 +0000 Subject: [PATCH 10/12] Moving the address refresh to a background task for writes hitting 410 --- .../cosmos/implementation/RetryUtils.java | 19 +++++++++++++++--- ...licatedResourceClientGoneForWriteTest.java | 20 ++++++++++++++++--- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java index 01d5a74eea9c..f1c2c8c37637 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.function.Function; @@ -92,9 +93,7 @@ public static Function> toRetryWithAlternateFunc(Function shouldRetryResult.policyArg.getValue0() != null && shouldRetryResult.policyArg.getValue0()) { - return addressSelector.resolveAddressesAsync( - rxDocumentServiceRequest, - true).flatMap((dummy) -> failure); + startBackgroundAddressRefresh(rxDocumentServiceRequest, addressSelector); } return failure; @@ -122,6 +121,20 @@ public static Function> toRetryWithAlternateFunc(Function }; } + private static void startBackgroundAddressRefresh( + RxDocumentServiceRequest request, + AddressSelector addressSelector) { + + addressSelector.resolveAddressesAsync(request, true) + .publishOn(Schedulers.elastic()) + .subscribe( + r -> { + }, + e -> logger.warn( + "Background refresh of addresses failed with {}", e.getMessage(), e) + ); + } + private static Mono recursiveFunc( Function, Mono> callbackMethod, IRetryPolicy retryPolicy, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java index 644529dd86d9..5e0acb54edc0 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java @@ -25,7 +25,7 @@ import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; -public class ReplicatedResourceClientGoneForWriteTest extends ReplicatedResourceClientPartitionSplitTest { +public class ReplicatedResourceClientGoneForWriteTest { @DataProvider(name = "goneOnWriteRefreshesAddressesArgProvider") public Object[][] goneOnWriteRefreshesAddressesArgProvider() { return new Object[][]{ @@ -34,7 +34,10 @@ public Object[][] goneOnWriteRefreshesAddressesArgProvider() { }; } - @Test(groups = { "unit" }, dataProvider = "goneOnWriteRefreshesAddressesArgProvider", timeOut = TIMEOUT) + @Test( + groups = { "unit" }, + dataProvider = "goneOnWriteRefreshesAddressesArgProvider", + timeOut = ReplicatedResourceClientPartitionSplitTest.TIMEOUT) public void gone_RefreshCache_Write(ConsistencyLevel consistencyLevel) { Uri primaryAddress = Uri.create("http://primary/"); @@ -103,11 +106,22 @@ public void gone_RefreshCache_Write(ConsistencyLevel consistencyLevel) { Function> prepareRequestAsyncDelegate = null; Mono storeResponseObs = resourceClient.invokeAsync(request, prepareRequestAsyncDelegate); + // Address refresh is happening in the background - allowing some time to finish the refresh + // Because this is all using mocking (no emulator) the delay of a couple hundred ms should be sufficient + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + FailureValidator validator = FailureValidator .builder() .instanceOf(CosmosException.class) .statusCode(410).build(); - validateFailure(storeResponseObs, validator, TIMEOUT); + ReplicatedResourceClientPartitionSplitTest.validateFailure( + storeResponseObs, + validator, + ReplicatedResourceClientPartitionSplitTest.TIMEOUT); addressSelectorWrapper.verifyNumberOfForceCacheRefreshGreaterThanOrEqualTo(1); } } From 6ad514f79862c51e6d1e5eb027db5d9690cc27d5 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Thu, 1 Oct 2020 00:28:11 +0000 Subject: [PATCH 11/12] Addressed Kushagra's PR comments --- .../java/com/azure/cosmos/implementation/RetryUtils.java | 9 +++++---- .../directconnectivity/rntbd/RntbdRequestRecord.java | 2 +- .../ReplicatedResourceClientGoneForWriteTest.java | 3 +++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java index f1c2c8c37637..491058ba56ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java @@ -89,11 +89,12 @@ public static Function> toRetryWithAlternateFunc(Function final Throwable errorToReturn = shouldRetryResult.exception != null ? shouldRetryResult.exception : e; final Mono failure = Mono.error(errorToReturn); - if (shouldRetryResult.policyArg != null && - shouldRetryResult.policyArg.getValue0() != null && - shouldRetryResult.policyArg.getValue0()) { + if (shouldRetryResult.policyArg != null) { + Boolean forceAddressRefresh = shouldRetryResult.policyArg.getValue0(); - startBackgroundAddressRefresh(rxDocumentServiceRequest, addressSelector); + if (forceAddressRefresh != null && forceAddressRefresh) { + startBackgroundAddressRefresh(rxDocumentServiceRequest, addressSelector); + } } return failure; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java index 72caa6a321f8..47483cd3c3b6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java @@ -224,7 +224,7 @@ public boolean expire() { /** * Provides information whether the request could have been sent to the service - * @return fals if it is possible to guarantee that the request never arrived at the service - true otherwise + * @return false if it is possible to guarantee that the request never arrived at the service - true otherwise */ public boolean hasSendingRequestStarted() { return this.sendingRequestHasStarted; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java index 5e0acb54edc0..ba4051a4eda6 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientGoneForWriteTest.java @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.BridgeInternal; From 6f0b1421e816f854144095439ad4872317015901 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Thu, 1 Oct 2020 22:55:51 +0000 Subject: [PATCH 12/12] Addressing benchmark flakiness introduced due to not retrying writes which have been sent --- .../cosmos/benchmark/AsyncBenchmark.java | 44 ++++++++++++++++--- .../cosmos/benchmark/ReadMyWriteWorkflow.java | 34 +++++++++++++- 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index b97dca984ec4..66f64cd8f098 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -13,6 +13,7 @@ import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.GatewayConnectionConfig; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.ThroughputProperties; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.CsvReporter; @@ -35,6 +36,7 @@ import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import java.net.InetSocketAddress; import java.time.Duration; @@ -143,11 +145,43 @@ abstract class AsyncBenchmark { dataFieldValue, partitionKey, configuration.getDocumentDataFieldCount()); - Flux obs = cosmosAsyncContainer.createItem(newDoc).map(resp -> { - PojoizedJson x = - resp.getItem(); - return x; - }).flux(); + Flux obs = cosmosAsyncContainer + .createItem(newDoc) + .retryWhen(Retry.max(5).filter((error) -> { + if (!(error instanceof CosmosException)) { + return false; + } + final CosmosException cosmosException = (CosmosException)error; + if (cosmosException.getStatusCode() == 410 || + cosmosException.getStatusCode() == 408 || + cosmosException.getStatusCode() == 429 || + cosmosException.getStatusCode() == 503) { + return true; + } + + return false; + })) + .onErrorResume( + (error) -> { + if (!(error instanceof CosmosException)) { + return false; + } + final CosmosException cosmosException = (CosmosException)error; + if (cosmosException.getStatusCode() == 409) { + return true; + } + + return false; + }, + (conflictException) -> cosmosAsyncContainer.readItem( + uuid, new PartitionKey(partitionKey), PojoizedJson.class) + ) + .map(resp -> { + PojoizedJson x = + resp.getItem(); + return x; + }) + .flux(); createDocumentObservables.add(obs); } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java index 0b092c294054..402fee481e2f 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java @@ -5,6 +5,7 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosBridgeInternal; +import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.Database; import com.azure.cosmos.implementation.Document; @@ -21,6 +22,7 @@ import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; import java.util.ArrayList; import java.util.Arrays; @@ -194,8 +196,36 @@ private Flux writeDocument(Integer i) { Integer key = i == null ? cacheKey() : i; return client.createDocument(getCollectionLink(), document, null, false) - .doOnNext(r -> cache.put(key, r.getResource())) - .map(ResourceResponse::getResource).flux(); + .retryWhen(Retry.max(5).filter((error) -> { + if (!(error instanceof CosmosException)) { + return false; + } + final CosmosException cosmosException = (CosmosException)error; + if (cosmosException.getStatusCode() == 410 || + cosmosException.getStatusCode() == 408 || + cosmosException.getStatusCode() == 429 || + cosmosException.getStatusCode() == 503) { + return true; + } + + return false; + })) + .onErrorResume( + (error) -> { + if (!(error instanceof CosmosException)) { + return false; + } + final CosmosException cosmosException = (CosmosException)error; + if (cosmosException.getStatusCode() == 409) { + return true; + } + + return false; + }, + (conflictException) -> client.readDocument(getDocumentLink(document), null) + ) + .doOnNext(r -> cache.put(key, r.getResource())) + .map(ResourceResponse::getResource).flux(); } /**