Skip to content
Merged
1 change: 1 addition & 0 deletions sdk/cosmos/changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- ChangeFeedProcessor; fixes and extra logging related to the creations of the lease documents.
- Port consistency policy bug fix (see https://github.com/Azure/azure-cosmosdb-java/pull/196)
- Port test fixes (see https://github.com/Azure/azure-cosmosdb-java/pull/196)
- ChangeFeedProcessor: fix a race condition issue where the lease document has been updated while we are trying to write the current state like when check-pointing the continuation token.

### 3.3.1
- Added @JsonIgnore on getLogger in JsonSerializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public interface ChangeFeedObserverContext {
*
* @return a representation of the deferred computation of this call.
*/
Mono<Void> checkpoint();
Mono<Lease> checkpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface PartitionCheckpointer {
* @param сontinuationToken the continuation token.
* @return a deferred operation of this call.
*/
Mono<Void> checkpointPartition(String сontinuationToken);
Mono<Lease> checkpointPartition(String сontinuationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.data.cosmos.internal.changefeed.exceptions;

import com.azure.data.cosmos.internal.changefeed.Lease;

/**
* Exception occurred when the lease was updated by a different thread or worker while current thread is trying to update it as well.
*/
public class LeaseConflictException extends RuntimeException {
private static final String DEFAULT_MESSAGE = "The lease was updated by a different worker.";
private Lease lease;

/**
* Initializes a new instance of the @link LeaseConflictException} class.
*/
public LeaseConflictException() {
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using the specified lease.
*
* @param lease an instance of a lost lease.
*/
public LeaseConflictException(Lease lease) {
super(DEFAULT_MESSAGE);
this.lease = lease;
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using the specified lease.
*
* @param lease an instance of a lost lease.
* @param message the exception error message.
*/
public LeaseConflictException(Lease lease, String message) {
super(message);
this.lease = lease;
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using error message.
*
* @param message the exception error message.
*/
public LeaseConflictException(String message) {
super(message);
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using error message and inner exception.
*
* @param message the exception error message.
* @param innerException the inner exception.
*
*/
public LeaseConflictException(String message, Exception innerException) {
super(message, innerException.getCause());
}

/**
* Gets the lost lease.
*
* @return the lost lease.
*/
public Lease getLease() {
return this.lease;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ public class LeaseLostException extends RuntimeException {
/**
* Initializes a new instance of the @link LeaseLostException} class.
*/
public LeaseLostException()
{
public LeaseLostException() {
}

/**
* Initializes a new instance of the @link LeaseLostException} class using the specified lease.
*
* @param lease an instance of a lost lease.
*/
public LeaseLostException(Lease lease)
{
public LeaseLostException(Lease lease) {
super(DEFAULT_MESSAGE);
this.lease = lease;
}
Expand All @@ -37,8 +35,7 @@ public LeaseLostException(Lease lease)
*
* @param message the exception error message.
*/
public LeaseLostException(String message)
{
public LeaseLostException(String message) {
super(message);
}

Expand All @@ -49,8 +46,7 @@ public LeaseLostException(String message)
* @param innerException the inner exception.
*
*/
public LeaseLostException(String message, Exception innerException)
{
public LeaseLostException(String message, Exception innerException) {
super(message, innerException.getCause());
}

Expand All @@ -62,8 +58,7 @@ public LeaseLostException(String message, Exception innerException)
* @param innerException the inner exception.
* @param isGone true if lease doesn't exist.
*/
public LeaseLostException(Lease lease, Exception innerException, boolean isGone)
{
public LeaseLostException(Lease lease, Exception innerException, boolean isGone) {
super(DEFAULT_MESSAGE, innerException.getCause());
this.lease = lease;
this.isGone = isGone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverCloseReason;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext;
import com.azure.data.cosmos.internal.changefeed.CheckpointFrequency;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;
Expand All @@ -18,6 +20,7 @@
* Auto check-pointer implementation for {@link ChangeFeedObserver}.
*/
class AutoCheckpointer implements ChangeFeedObserver {
private final Logger logger = LoggerFactory.getLogger(AutoCheckpointer.class);
private final CheckpointFrequency checkpointFrequency;
private final ChangeFeedObserver observer;
private volatile int processedDocCount;
Expand Down Expand Up @@ -50,6 +53,9 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
@Override
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
return this.observer.processChanges(context, docs)
.doOnError(throwable -> {
logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable);
})
.then(this.afterProcessChanges(context));
}

Expand All @@ -58,10 +64,14 @@ private Mono<Void> afterProcessChanges(ChangeFeedObserverContext context) {

if (this.isCheckpointNeeded()) {
return context.checkpoint()
.doOnSuccess((Void) -> {
.doOnError(throwable -> {
logger.warn("Checkpoint failed; this worker will be killed", throwable);
})
.doOnSuccess( (Void) -> {
this.processedDocCount = 0;
this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC"));
});
})
.then();
}
return Mono.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.data.cosmos.CosmosItemProperties;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.PartitionCheckpointer;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -40,7 +41,7 @@ public ChangeFeedObserverContextImpl(String leaseToken, FeedResponse<CosmosItemP
* @return a deferred computation of this call.
*/
@Override
public Mono<Void> checkpoint() {
public Mono<Lease> checkpoint() {
this.responseContinuation = this.feedResponse.continuationToken();

return this.checkpointer.checkpointPartition(this.responseContinuation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
@Override
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
log.info("Start processing from thread {}", Thread.currentThread().getId());
consumer.accept(docs);
log.info("Done processing from thread {}", Thread.currentThread().getId());
try {
consumer.accept(docs);
log.info("Done processing from thread {}", Thread.currentThread().getId());
} catch (Exception ex) {
log.warn("Unexpected exception thrown from thread {}", Thread.currentThread().getId(), ex);
return Mono.error(ex);
}

return Mono.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.ServiceItemLease;
import com.azure.data.cosmos.internal.changefeed.ServiceItemLeaseUpdater;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseConflictException;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseLostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,21 +79,39 @@ public Mono<Lease> updateLease(Lease cachedLease, CosmosItem itemLink, CosmosIte
CosmosItemProperties document = cosmosItemResponse.properties();
ServiceItemLease serverLease = ServiceItemLease.fromDocument(document);
logger.info(
"Partition {} update failed because the lease with token '{}' was updated by host '{}' with token '{}'.",
"Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.",
arrayLease[0].getLeaseToken(),
arrayLease[0].getConcurrencyToken(),
serverLease.getOwner(),
serverLease.getConcurrencyToken());
arrayLease[0] = serverLease;

throw new RuntimeException("Partition update failed");
throw new LeaseConflictException(arrayLease[0], "Partition update failed");
});
})
.retry(RETRY_COUNT_ON_CONFLICT, throwable -> {
if (throwable instanceof RuntimeException) {
return throwable instanceof LeaseLostException;
if (throwable instanceof LeaseConflictException) {
logger.info(
"Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.",
arrayLease[0].getLeaseToken(),
arrayLease[0].getConcurrencyToken(),
arrayLease[0].getOwner());
return true;
}
return false;
})
.onErrorResume(throwable -> {
if (throwable instanceof LeaseConflictException) {
logger.warn(
"Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.",
arrayLease[0].getLeaseToken(),
arrayLease[0].getConcurrencyToken(),
arrayLease[0].getOwner(),
arrayLease[0].getContinuationToken(), throwable);

return Mono.just(arrayLease[0]);
}
return Mono.error(throwable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ public Mono<Void> run(CancellationToken cancellationToken) {
return !cancellationToken.isCancellationRequested();
})
.then()
.onErrorResume(throwable -> {
.doOnError(throwable -> {
if (throwable instanceof LeaseLostException) {
logger.info("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
} else {
logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
}
return Mono.error(throwable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,11 @@ public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
serverLease.setContinuationToken(continuationToken);

return serverLease;
}));
}))
.doOnError(throwable -> {
logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'",
lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void open(ChangeFeedObserverContext context) {
}
catch (RuntimeException userException)
{
this.logger.warn("Exception happened on ChangeFeedObserver.open", userException);
this.logger.warn("Exception thrown during ChangeFeedObserver.open from thread {}", Thread.currentThread().getId(), userException);
throw new ObserverException(userException);
}
}
Expand All @@ -44,17 +44,16 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
}
catch (RuntimeException userException)
{
this.logger.warn("Exception happened on ChangeFeedObserver.close", userException);
this.logger.warn("Exception thrown during ChangeFeedObserver.close from thread {}", Thread.currentThread().getId(), userException);
throw new ObserverException(userException);
}
}

@Override
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
return this.changeFeedObserver.processChanges(context, docs)
.onErrorResume(throwable -> {
this.logger.warn("Exception happened on ChangeFeedObserver.processChanges", throwable);
return Mono.error(new ObserverException(throwable));
.doOnError(throwable -> {
this.logger.warn("Exception thrown during ChangeFeedObserver.processChanges from thread {}", Thread.currentThread().getId(), throwable);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease leas
}

@Override
public Mono<Void> checkpointPartition(String сontinuationToken) {
public Mono<Lease> checkpointPartition(String сontinuationToken) {
return this.leaseCheckpointer.checkpoint(this.lease, сontinuationToken)
.map(lease1 -> {
this.lease = lease1;
logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken());
return lease1;
})
.then();
});
}
}
Loading