Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public interface ChangeFeedObserverContext {
*
* @return a representation of the deferred computation of this call.
*/
Mono<Void> checkpoint();
Mono<Lease> checkpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface PartitionCheckpointer {
* @param сontinuationToken the continuation token.
* @return a deferred operation of this call.
*/
Mono<Void> checkpointPartition(String сontinuationToken);
Mono<Lease> checkpointPartition(String сontinuationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.internal.changefeed.exceptions;

import com.azure.cosmos.internal.changefeed.Lease;

/**
* Exception occurred when the lease was updated by a different thread or worker while current thread is trying to update it as well.
*/
public class LeaseConflictException extends RuntimeException {
private static final String DEFAULT_MESSAGE = "The lease was updated by a different worker.";
private Lease lease;

/**
* Initializes a new instance of the @link LeaseConflictException} class.
*/
public LeaseConflictException() {
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using the specified lease.
*
* @param lease an instance of a lost lease.
*/
public LeaseConflictException(Lease lease) {
super(DEFAULT_MESSAGE);
this.lease = lease;
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using the specified lease.
*
* @param lease an instance of a lost lease.
* @param message the exception error message.
*/
public LeaseConflictException(Lease lease, String message) {
super(message);
this.lease = lease;
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using error message.
*
* @param message the exception error message.
*/
public LeaseConflictException(String message) {
super(message);
}

/**
* Initializes a new instance of the @link LeaseConflictException} class using error message and inner exception.
*
* @param message the exception error message.
* @param innerException the inner exception.
*
*/
public LeaseConflictException(String message, Exception innerException) {
super(message, innerException.getCause());
}

/**
* Gets the lost lease.
*
* @return the lost lease.
*/
public Lease getLease() {
return this.lease;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ public class LeaseLostException extends RuntimeException {
/**
* Initializes a new instance of the @link LeaseLostException} class.
*/
public LeaseLostException()
{
public LeaseLostException() {
}

/**
* Initializes a new instance of the @link LeaseLostException} class using the specified lease.
*
* @param lease an instance of a lost lease.
*/
public LeaseLostException(Lease lease)
{
public LeaseLostException(Lease lease) {
super(DEFAULT_MESSAGE);
this.lease = lease;
}
Expand All @@ -37,8 +35,7 @@ public LeaseLostException(Lease lease)
*
* @param message the exception error message.
*/
public LeaseLostException(String message)
{
public LeaseLostException(String message) {
super(message);
}

Expand All @@ -49,8 +46,7 @@ public LeaseLostException(String message)
* @param innerException the inner exception.
*
*/
public LeaseLostException(String message, Exception innerException)
{
public LeaseLostException(String message, Exception innerException) {
super(message, innerException.getCause());
}

Expand All @@ -62,8 +58,7 @@ public LeaseLostException(String message, Exception innerException)
* @param innerException the inner exception.
* @param isGone true if lease doesn't exist.
*/
public LeaseLostException(Lease lease, Exception innerException, boolean isGone)
{
public LeaseLostException(Lease lease, Exception innerException, boolean isGone) {
super(DEFAULT_MESSAGE, innerException.getCause());
this.lease = lease;
this.isGone = isGone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.cosmos.internal.changefeed.ChangeFeedObserverCloseReason;
import com.azure.cosmos.internal.changefeed.ChangeFeedObserverContext;
import com.azure.cosmos.internal.changefeed.CheckpointFrequency;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;
Expand All @@ -18,6 +20,7 @@
* Auto check-pointer implementation for {@link ChangeFeedObserver}.
*/
class AutoCheckpointer implements ChangeFeedObserver {
private final Logger logger = LoggerFactory.getLogger(AutoCheckpointer.class);
private final CheckpointFrequency checkpointFrequency;
private final ChangeFeedObserver observer;
private volatile int processedDocCount;
Expand Down Expand Up @@ -50,6 +53,9 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
@Override
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
return this.observer.processChanges(context, docs)
.doOnError(throwable -> {
logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable);
})
.then(this.afterProcessChanges(context));
}

Expand All @@ -58,10 +64,14 @@ private Mono<Void> afterProcessChanges(ChangeFeedObserverContext context) {

if (this.isCheckpointNeeded()) {
return context.checkpoint()
.doOnError(throwable -> {
logger.warn("Checkpoint failed; this worker will be killed", throwable);
})
.doOnSuccess((Void) -> {
this.processedDocCount = 0;
this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC"));
});
})
.then();
}
return Mono.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.internal.changefeed.ChangeFeedObserverContext;
import com.azure.cosmos.internal.changefeed.Lease;
import com.azure.cosmos.internal.changefeed.PartitionCheckpointer;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -40,7 +41,7 @@ public ChangeFeedObserverContextImpl(String leaseToken, FeedResponse<CosmosItemP
* @return a deferred computation of this call.
*/
@Override
public Mono<Void> checkpoint() {
public Mono<Lease> checkpoint() {
this.responseContinuation = this.feedResponse.getContinuationToken();

return this.checkpointer.checkpointPartition(this.responseContinuation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
@Override
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
log.info("Start processing from thread {}", Thread.currentThread().getId());
consumer.accept(docs);
log.info("Done processing from thread {}", Thread.currentThread().getId());

try {
consumer.accept(docs);
log.info("Done processing from thread {}", Thread.currentThread().getId());
} catch (Exception ex) {
log.warn("Unexpected exception thrown from thread {}", Thread.currentThread().getId(), ex);
return Mono.error(ex);
}
return Mono.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.cosmos.internal.changefeed.Lease;
import com.azure.cosmos.internal.changefeed.ServiceItemLease;
import com.azure.cosmos.internal.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.internal.changefeed.exceptions.LeaseConflictException;
import com.azure.cosmos.internal.changefeed.exceptions.LeaseLostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,21 +79,39 @@ public Mono<Lease> updateLease(Lease cachedLease, CosmosAsyncItem itemLink, Cosm
CosmosItemProperties document = cosmosItemResponse.getProperties();
ServiceItemLease serverLease = ServiceItemLease.fromDocument(document);
logger.info(
"Partition {} update failed because the lease with token '{}' was updated by host '{}' with token '{}'.",
"Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.",
arrayLease[0].getLeaseToken(),
arrayLease[0].getConcurrencyToken(),
serverLease.getOwner(),
serverLease.getConcurrencyToken());
arrayLease[0] = serverLease;

throw new RuntimeException("Partition update failed");
throw new LeaseConflictException(arrayLease[0], "Partition update failed");
});
})
.retry(RETRY_COUNT_ON_CONFLICT, throwable -> {
if (throwable instanceof RuntimeException) {
return throwable instanceof LeaseLostException;
if (throwable instanceof LeaseConflictException) {
logger.info(
"Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.",
arrayLease[0].getLeaseToken(),
arrayLease[0].getConcurrencyToken(),
arrayLease[0].getOwner());
return true;
}
return false;
})
.onErrorResume(throwable -> {
if (throwable instanceof LeaseConflictException) {
logger.warn(
"Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.",
arrayLease[0].getLeaseToken(),
arrayLease[0].getConcurrencyToken(),
arrayLease[0].getOwner(),
arrayLease[0].getContinuationToken(), throwable);

return Mono.just(arrayLease[0]);
}
return Mono.error(throwable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ public Mono<Void> run(CancellationToken cancellationToken) {
return !cancellationToken.isCancellationRequested();
})
.then()
.onErrorResume(throwable -> {
.doOnError(throwable -> {
if (throwable instanceof LeaseLostException) {
logger.info("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
} else {
logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
}
return Mono.error(throwable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,11 @@ public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
serverLease.setContinuationToken(continuationToken);

return serverLease;
}));
}))
.doOnError(throwable -> {
logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'",
lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void open(ChangeFeedObserverContext context) {
}
catch (RuntimeException userException)
{
this.logger.warn("Exception happened on ChangeFeedObserver.open", userException);
this.logger.warn("Exception thrown during ChangeFeedObserver.open from thread {}", Thread.currentThread().getId(), userException);
throw new ObserverException(userException);
}
}
Expand All @@ -44,17 +44,16 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
}
catch (RuntimeException userException)
{
this.logger.warn("Exception happened on ChangeFeedObserver.close", userException);
this.logger.warn("Exception thrown during ChangeFeedObserver.close from thread {}", Thread.currentThread().getId(), userException);
throw new ObserverException(userException);
}
}

@Override
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
return this.changeFeedObserver.processChanges(context, docs)
.onErrorResume(throwable -> {
this.logger.warn("Exception happened on ChangeFeedObserver.processChanges", throwable);
return Mono.error(new ObserverException(throwable));
.doOnError(throwable -> {
this.logger.warn("Exception thrown during ChangeFeedObserver.processChanges from thread {}", Thread.currentThread().getId(), throwable);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease leas
}

@Override
public Mono<Void> checkpointPartition(String сontinuationToken) {
public Mono<Lease> checkpointPartition(String сontinuationToken) {
return this.leaseCheckpointer.checkpoint(this.lease, сontinuationToken)
.map(lease1 -> {
this.lease = lease1;
logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken());
return lease1;
})
.then();
});
}
}
Loading