From d34b98edc1c8f0d0c2c6286945312662c27ae964 Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Thu, 7 Nov 2019 19:13:10 -0800 Subject: [PATCH] ChangeFeedProcessor: fix a race condition issue where the lease document has been updated while we are trying to write the current state like when check-pointing the continuation token - see https://github.com/Azure/azure-sdk-for-java/pull/6205/files --- .../changefeed/ChangeFeedObserverContext.java | 2 +- .../changefeed/PartitionCheckpointer.java | 2 +- .../exceptions/LeaseConflictException.java | 69 +++++++++++++++++++ .../exceptions/LeaseLostException.java | 15 ++-- .../implementation/AutoCheckpointer.java | 12 +++- .../ChangeFeedObserverContextImpl.java | 3 +- .../implementation/DefaultObserver.java | 10 ++- .../DocumentServiceLeaseUpdaterImpl.java | 27 ++++++-- .../implementation/LeaseRenewerImpl.java | 3 +- .../implementation/LeaseStoreManagerImpl.java | 6 +- ...onWrappingChangeFeedObserverDecorator.java | 9 ++- .../PartitionCheckpointerImpl.java | 5 +- .../PartitionProcessorImpl.java | 42 +++++++---- sdk/cosmos/changelog/README.md | 1 + 14 files changed, 162 insertions(+), 44 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseConflictException.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/ChangeFeedObserverContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/ChangeFeedObserverContext.java index 6e6cbe951270..f4f6b7c02a44 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/ChangeFeedObserverContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/ChangeFeedObserverContext.java @@ -34,5 +34,5 @@ public interface ChangeFeedObserverContext { * * @return a representation of the deferred computation of this call. */ - Mono checkpoint(); + Mono checkpoint(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/PartitionCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/PartitionCheckpointer.java index 44fb8aa3ba20..8f9cbd7b05ee 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/PartitionCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/PartitionCheckpointer.java @@ -14,5 +14,5 @@ public interface PartitionCheckpointer { * @param сontinuationToken the continuation token. * @return a deferred operation of this call. */ - Mono checkpointPartition(String сontinuationToken); + Mono checkpointPartition(String сontinuationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseConflictException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseConflictException.java new file mode 100644 index 000000000000..9523684d430a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseConflictException.java @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.internal.changefeed.exceptions; + +import com.azure.cosmos.internal.changefeed.Lease; + +/** + * Exception occurred when the lease was updated by a different thread or worker while current thread is trying to update it as well. + */ +public class LeaseConflictException extends RuntimeException { + private static final String DEFAULT_MESSAGE = "The lease was updated by a different worker."; + private Lease lease; + + /** + * Initializes a new instance of the @link LeaseConflictException} class. + */ + public LeaseConflictException() { + } + + /** + * Initializes a new instance of the @link LeaseConflictException} class using the specified lease. + * + * @param lease an instance of a lost lease. + */ + public LeaseConflictException(Lease lease) { + super(DEFAULT_MESSAGE); + this.lease = lease; + } + + /** + * Initializes a new instance of the @link LeaseConflictException} class using the specified lease. + * + * @param lease an instance of a lost lease. + * @param message the exception error message. + */ + public LeaseConflictException(Lease lease, String message) { + super(message); + this.lease = lease; + } + + /** + * Initializes a new instance of the @link LeaseConflictException} class using error message. + * + * @param message the exception error message. + */ + public LeaseConflictException(String message) { + super(message); + } + + /** + * Initializes a new instance of the @link LeaseConflictException} class using error message and inner exception. + * + * @param message the exception error message. + * @param innerException the inner exception. + * + */ + public LeaseConflictException(String message, Exception innerException) { + super(message, innerException.getCause()); + } + + /** + * Gets the lost lease. + * + * @return the lost lease. + */ + public Lease getLease() { + return this.lease; + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseLostException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseLostException.java index 0a7f2a4da186..264d5d7f7589 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseLostException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/exceptions/LeaseLostException.java @@ -17,8 +17,7 @@ public class LeaseLostException extends RuntimeException { /** * Initializes a new instance of the @link LeaseLostException} class. */ - public LeaseLostException() - { + public LeaseLostException() { } /** @@ -26,8 +25,7 @@ public LeaseLostException() * * @param lease an instance of a lost lease. */ - public LeaseLostException(Lease lease) - { + public LeaseLostException(Lease lease) { super(DEFAULT_MESSAGE); this.lease = lease; } @@ -37,8 +35,7 @@ public LeaseLostException(Lease lease) * * @param message the exception error message. */ - public LeaseLostException(String message) - { + public LeaseLostException(String message) { super(message); } @@ -49,8 +46,7 @@ public LeaseLostException(String message) * @param innerException the inner exception. * */ - public LeaseLostException(String message, Exception innerException) - { + public LeaseLostException(String message, Exception innerException) { super(message, innerException.getCause()); } @@ -62,8 +58,7 @@ public LeaseLostException(String message, Exception innerException) * @param innerException the inner exception. * @param isGone true if lease doesn't exist. */ - public LeaseLostException(Lease lease, Exception innerException, boolean isGone) - { + public LeaseLostException(Lease lease, Exception innerException, boolean isGone) { super(DEFAULT_MESSAGE, innerException.getCause()); this.lease = lease; this.isGone = isGone; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/AutoCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/AutoCheckpointer.java index f1a914b3a0bc..710415cc39b8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/AutoCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/AutoCheckpointer.java @@ -7,6 +7,8 @@ import com.azure.cosmos.internal.changefeed.ChangeFeedObserverCloseReason; import com.azure.cosmos.internal.changefeed.ChangeFeedObserverContext; import com.azure.cosmos.internal.changefeed.CheckpointFrequency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.time.Duration; @@ -18,6 +20,7 @@ * Auto check-pointer implementation for {@link ChangeFeedObserver}. */ class AutoCheckpointer implements ChangeFeedObserver { + private final Logger logger = LoggerFactory.getLogger(AutoCheckpointer.class); private final CheckpointFrequency checkpointFrequency; private final ChangeFeedObserver observer; private volatile int processedDocCount; @@ -50,6 +53,9 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas @Override public Mono processChanges(ChangeFeedObserverContext context, List docs) { return this.observer.processChanges(context, docs) + .doOnError(throwable -> { + logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable); + }) .then(this.afterProcessChanges(context)); } @@ -58,10 +64,14 @@ private Mono afterProcessChanges(ChangeFeedObserverContext context) { if (this.isCheckpointNeeded()) { return context.checkpoint() + .doOnError(throwable -> { + logger.warn("Checkpoint failed; this worker will be killed", throwable); + }) .doOnSuccess((Void) -> { this.processedDocCount = 0; this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC")); - }); + }) + .then(); } return Mono.empty(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ChangeFeedObserverContextImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ChangeFeedObserverContextImpl.java index a51e76a56e7d..aa601ee4f60b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ChangeFeedObserverContextImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ChangeFeedObserverContextImpl.java @@ -6,6 +6,7 @@ import com.azure.cosmos.CosmosItemProperties; import com.azure.cosmos.FeedResponse; import com.azure.cosmos.internal.changefeed.ChangeFeedObserverContext; +import com.azure.cosmos.internal.changefeed.Lease; import com.azure.cosmos.internal.changefeed.PartitionCheckpointer; import reactor.core.publisher.Mono; @@ -40,7 +41,7 @@ public ChangeFeedObserverContextImpl(String leaseToken, FeedResponse checkpoint() { + public Mono checkpoint() { this.responseContinuation = this.feedResponse.getContinuationToken(); return this.checkpointer.checkpointPartition(this.responseContinuation); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DefaultObserver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DefaultObserver.java index 3224fc974557..fb1c6af382e8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DefaultObserver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DefaultObserver.java @@ -34,9 +34,13 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas @Override public Mono processChanges(ChangeFeedObserverContext context, List docs) { log.info("Start processing from thread {}", Thread.currentThread().getId()); - consumer.accept(docs); - log.info("Done processing from thread {}", Thread.currentThread().getId()); - + try { + consumer.accept(docs); + log.info("Done processing from thread {}", Thread.currentThread().getId()); + } catch (Exception ex) { + log.warn("Unexpected exception thrown from thread {}", Thread.currentThread().getId(), ex); + return Mono.error(ex); + } return Mono.empty(); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.java index 09d65e2119ea..c5a3daa6ecae 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.java @@ -12,6 +12,7 @@ import com.azure.cosmos.internal.changefeed.Lease; import com.azure.cosmos.internal.changefeed.ServiceItemLease; import com.azure.cosmos.internal.changefeed.ServiceItemLeaseUpdater; +import com.azure.cosmos.internal.changefeed.exceptions.LeaseConflictException; import com.azure.cosmos.internal.changefeed.exceptions.LeaseLostException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,21 +79,39 @@ public Mono updateLease(Lease cachedLease, CosmosAsyncItem itemLink, Cosm CosmosItemProperties document = cosmosItemResponse.getProperties(); ServiceItemLease serverLease = ServiceItemLease.fromDocument(document); logger.info( - "Partition {} update failed because the lease with token '{}' was updated by host '{}' with token '{}'.", + "Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.", arrayLease[0].getLeaseToken(), arrayLease[0].getConcurrencyToken(), serverLease.getOwner(), serverLease.getConcurrencyToken()); arrayLease[0] = serverLease; - throw new RuntimeException("Partition update failed"); + throw new LeaseConflictException(arrayLease[0], "Partition update failed"); }); }) .retry(RETRY_COUNT_ON_CONFLICT, throwable -> { - if (throwable instanceof RuntimeException) { - return throwable instanceof LeaseLostException; + if (throwable instanceof LeaseConflictException) { + logger.info( + "Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.", + arrayLease[0].getLeaseToken(), + arrayLease[0].getConcurrencyToken(), + arrayLease[0].getOwner()); + return true; } return false; + }) + .onErrorResume(throwable -> { + if (throwable instanceof LeaseConflictException) { + logger.warn( + "Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.", + arrayLease[0].getLeaseToken(), + arrayLease[0].getConcurrencyToken(), + arrayLease[0].getOwner(), + arrayLease[0].getContinuationToken(), throwable); + + return Mono.just(arrayLease[0]); + } + return Mono.error(throwable); }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java index 668dfbfc40ff..95960919f8f2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java @@ -62,13 +62,12 @@ public Mono run(CancellationToken cancellationToken) { return !cancellationToken.isCancellationRequested(); }) .then() - .onErrorResume(throwable -> { + .doOnError(throwable -> { if (throwable instanceof LeaseLostException) { logger.info("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable); } else { logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable); } - return Mono.error(throwable); }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java index a23415ffa68a..15497bc5d950 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java @@ -379,7 +379,11 @@ public Mono checkpoint(Lease lease, String continuationToken) { serverLease.setContinuationToken(continuationToken); return serverLease; - })); + })) + .doOnError(throwable -> { + logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'", + lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken()); + }); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ObserverExceptionWrappingChangeFeedObserverDecorator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ObserverExceptionWrappingChangeFeedObserverDecorator.java index 62f1e66459ae..003fb5952f58 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ObserverExceptionWrappingChangeFeedObserverDecorator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/ObserverExceptionWrappingChangeFeedObserverDecorator.java @@ -32,7 +32,7 @@ public void open(ChangeFeedObserverContext context) { } catch (RuntimeException userException) { - this.logger.warn("Exception happened on ChangeFeedObserver.open", userException); + this.logger.warn("Exception thrown during ChangeFeedObserver.open from thread {}", Thread.currentThread().getId(), userException); throw new ObserverException(userException); } } @@ -44,7 +44,7 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas } catch (RuntimeException userException) { - this.logger.warn("Exception happened on ChangeFeedObserver.close", userException); + this.logger.warn("Exception thrown during ChangeFeedObserver.close from thread {}", Thread.currentThread().getId(), userException); throw new ObserverException(userException); } } @@ -52,9 +52,8 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas @Override public Mono processChanges(ChangeFeedObserverContext context, List docs) { return this.changeFeedObserver.processChanges(context, docs) - .onErrorResume(throwable -> { - this.logger.warn("Exception happened on ChangeFeedObserver.processChanges", throwable); - return Mono.error(new ObserverException(throwable)); + .doOnError(throwable -> { + this.logger.warn("Exception thrown during ChangeFeedObserver.processChanges from thread {}", Thread.currentThread().getId(), throwable); }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionCheckpointerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionCheckpointerImpl.java index e103e943af00..47dea078fbf2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionCheckpointerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionCheckpointerImpl.java @@ -23,13 +23,12 @@ public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease leas } @Override - public Mono checkpointPartition(String сontinuationToken) { + public Mono checkpointPartition(String сontinuationToken) { return this.leaseCheckpointer.checkpoint(this.lease, сontinuationToken) .map(lease1 -> { this.lease = lease1; logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken()); return lease1; - }) - .then(); + }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java index e2f204ef93ea..5de7b63b5be3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java @@ -13,6 +13,7 @@ import com.azure.cosmos.internal.changefeed.PartitionCheckpointer; import com.azure.cosmos.internal.changefeed.PartitionProcessor; import com.azure.cosmos.internal.changefeed.ProcessorSettings; +import com.azure.cosmos.internal.changefeed.exceptions.LeaseLostException; import com.azure.cosmos.internal.changefeed.exceptions.PartitionNotFoundException; import com.azure.cosmos.internal.changefeed.exceptions.PartitionSplitException; import com.azure.cosmos.internal.changefeed.exceptions.TaskCancelledException; @@ -96,11 +97,14 @@ public Mono run(CancellationToken cancellationToken) { this.lastContinuation = documentFeedResponse.getContinuationToken(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { return this.dispatchChanges(documentFeedResponse) - .doFinally( (Void) -> { - this.options.setRequestContinuation(this.lastContinuation); + .doOnError(throwable -> { + logger.debug("Exception was thrown from thread {}", Thread.currentThread().getId(), throwable); + }) + .doOnSuccess((Void) -> { + this.options.setRequestContinuation(this.lastContinuation); if (cancellationToken.isCancellationRequested()) throw new TaskCancelledException(); - }).flux(); + }); } this.options.setRequestContinuation(this.lastContinuation); @@ -119,7 +123,8 @@ public Mono run(CancellationToken cancellationToken) { if (throwable instanceof CosmosClientException) { CosmosClientException clientException = (CosmosClientException) throwable; - this.logger.warn("Exception: partition {}", this.options.getPartitionKey().getInternalPartitionKey(), clientException); + logger.warn("CosmosClientException: partition {} from thread {}", + this.options.getPartitionKey().getInternalPartitionKey(), Thread.currentThread().getId(), clientException); StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException); switch (docDbError) { @@ -136,12 +141,12 @@ public Mono run(CancellationToken cancellationToken) { if (this.options.getMaxItemCount() == null) { this.options.setMaxItemCount(DefaultMaxItemCount); } else if (this.options.getMaxItemCount() <= 1) { - this.logger.error("Cannot reduce getMaxItemCount further as it's already at {}", this.options.getMaxItemCount(), clientException); + logger.error("Cannot reduce maxItemCount further as it's already at {}", this.options.getMaxItemCount(), clientException); this.resultException = new RuntimeException(clientException); } this.options.setMaxItemCount(this.options.getMaxItemCount() / 2); - this.logger.warn("Reducing getMaxItemCount, new getValue: {}", this.options.getMaxItemCount()); + logger.warn("Reducing maxItemCount, new value: {}", this.options.getMaxItemCount()); return Flux.empty(); } case TRANSIENT_ERROR: { @@ -151,20 +156,28 @@ public Mono run(CancellationToken cancellationToken) { return Mono.just(clientException.getRetryAfterInMilliseconds()) // set some seed value to be able to run // the repeat loop .delayElement(Duration.ofMillis(100)) - .repeat( () -> { + .repeat(() -> { ZonedDateTime currentTime = ZonedDateTime.now(); return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer); - }).flatMap( values -> Flux.empty()); + }).flatMap(values -> Flux.empty()); } } default: { - this.logger.error("Unrecognized DocDbError enum getValue {}", docDbError, clientException); + logger.error("Unrecognized Cosmos exception returned error code {}", docDbError, clientException); this.resultException = new RuntimeException(clientException); } } + } else if (throwable instanceof LeaseLostException) { + logger.info("LeaseLoseException with partition {} from thread {}", + this.options.getPartitionKey().getInternalPartitionKey(), Thread.currentThread().getId()); + this.resultException = (LeaseLostException) throwable; } else if (throwable instanceof TaskCancelledException) { - this.logger.debug("Exception: partition {}", this.settings.getPartitionKeyRangeId(), throwable); + logger.debug("Task cancelled exception: partition {} from {}", + this.settings.getPartitionKeyRangeId(), Thread.currentThread().getId(), throwable); this.resultException = (TaskCancelledException) throwable; + } else { + logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable); + this.resultException = new RuntimeException(throwable); } return Flux.error(throwable); }) @@ -176,8 +189,13 @@ public Mono run(CancellationToken cancellationToken) { return true; }) - .onErrorResume(throwable -> Flux.empty()) - .then(); + .onErrorResume(throwable -> { + if (this.resultException == null) { + this.resultException = new RuntimeException(throwable); + } + + return Flux.empty(); + }).then(); } @Override diff --git a/sdk/cosmos/changelog/README.md b/sdk/cosmos/changelog/README.md index 13882cf36838..4b23e518f08c 100644 --- a/sdk/cosmos/changelog/README.md +++ b/sdk/cosmos/changelog/README.md @@ -8,6 +8,7 @@ ### 3.3.2 - ChangeFeedProcessor; fixes and extra logging related to the creations of the lease documents. - Port consistency policy bug fix (see https://github.com/Azure/azure-cosmosdb-java/pull/196) + - ChangeFeedProcessor: fix a race condition issue where the lease document has been updated while we are trying to write the current state like when check-pointing the continuation token. ### 3.3.1 - Added @JsonIgnore on getLogger in JsonSerializable