Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.data.cosmos.internal.changefeed;

import com.azure.data.cosmos.CosmosItemProperties;
import reactor.core.publisher.Mono;

import java.util.List;

Expand Down Expand Up @@ -30,6 +31,7 @@ public interface ChangeFeedObserver {
*
* @param context the context specifying partition for this observer, etc.
* @param docs the documents changed.
* @return a deferred operation of this call.
*/
void processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs);
Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public class ObserverException extends RuntimeException {
/**
* Initializes a new instance of the {@link ObserverException} class using the specified internal exception.
*
* @param originalException {@link Exception} thrown by the user code.
* @param originalException {@link Throwable} thrown by the user code.
*/
public ObserverException(Exception originalException) {
super(DefaultMessage, originalException.getCause());
public ObserverException(Throwable originalException) {
super(DefaultMessage, originalException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverCloseReason;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext;
import com.azure.data.cosmos.internal.changefeed.CheckpointFrequency;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.ZoneId;
Expand All @@ -19,12 +20,17 @@
class AutoCheckpointer implements ChangeFeedObserver {
private final CheckpointFrequency checkpointFrequency;
private final ChangeFeedObserver observer;
private int processedDocCount;
private ZonedDateTime lastCheckpointTime;
private volatile int processedDocCount;
private volatile ZonedDateTime lastCheckpointTime;

public AutoCheckpointer(CheckpointFrequency checkpointFrequency, ChangeFeedObserver observer) {
if (checkpointFrequency == null) throw new IllegalArgumentException("checkpointFrequency");
if (observer == null) throw new IllegalArgumentException("observer");
if (checkpointFrequency == null) {
throw new IllegalArgumentException("checkpointFrequency");
}

if (observer == null) {
throw new IllegalArgumentException("observer");
}

this.checkpointFrequency = checkpointFrequency;
this.observer = observer;
Expand All @@ -42,15 +48,22 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
}

@Override
public void processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
this.observer.processChanges(context, docs);
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
return this.observer.processChanges(context, docs)
.then(this.afterProcessChanges(context));
}

private Mono<Void> afterProcessChanges(ChangeFeedObserverContext context) {
this.processedDocCount ++;

if (this.isCheckpointNeeded()) {
context.checkpoint().block();
this.processedDocCount = 0;
this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC"));
return context.checkpoint()
.doOnSuccess((Void) -> {
this.processedDocCount = 0;
this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC"));
});
}
return Mono.empty();
}

private boolean isCheckpointNeeded() {
Expand All @@ -64,10 +77,6 @@ private boolean isCheckpointNeeded() {

Duration delta = Duration.between(this.lastCheckpointTime, ZonedDateTime.now(ZoneId.of("UTC")));

if (delta.compareTo(this.checkpointFrequency.getTimeInterval()) >= 0) {
return true;
}

return false;
return delta.compareTo(this.checkpointFrequency.getTimeInterval()) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,71 @@ class BootstrapperImpl implements Bootstrapper {
private final Duration lockTime;
private final Duration sleepTime;

public BootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStore, Duration lockTime, Duration sleepTime)
{
if (synchronizer == null) throw new IllegalArgumentException("synchronizer");
if (leaseStore == null) throw new IllegalArgumentException("leaseStore");
if (lockTime == null || lockTime.isNegative() || lockTime.isZero()) throw new IllegalArgumentException("lockTime should be non-null and positive");
if (sleepTime == null || sleepTime.isNegative() || sleepTime.isZero()) throw new IllegalArgumentException("sleepTime should be non-null and positive");
private volatile boolean isInitialized;
private volatile boolean isLockAcquired;

public BootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStore, Duration lockTime, Duration sleepTime) {
if (synchronizer == null) {
throw new IllegalArgumentException("synchronizer");
}

if (leaseStore == null) {
throw new IllegalArgumentException("leaseStore");
}

if (lockTime == null || lockTime.isNegative() || lockTime.isZero()) {
throw new IllegalArgumentException("lockTime should be non-null and positive");
}

if (sleepTime == null || sleepTime.isNegative() || sleepTime.isZero()) {
throw new IllegalArgumentException("sleepTime should be non-null and positive");
}

this.synchronizer = synchronizer;
this.leaseStore = leaseStore;
this.lockTime = lockTime;
this.sleepTime = sleepTime;

this.isInitialized = false;
}

@Override
public Mono<Void> initialize() {
BootstrapperImpl self = this;
this.isInitialized = false;

return Mono.fromRunnable( () -> {
while (true) {
boolean initialized = self.leaseStore.isInitialized().block();
return Mono.just(this)
.flatMap( value -> this.leaseStore.isInitialized())
.flatMap(initialized -> {
this.isInitialized = initialized;

if (initialized) break;
if (initialized) {
return Mono.empty();
} else {
return this.leaseStore.acquireInitializationLock(this.lockTime)
.flatMap(lockAcquired -> {
this.isLockAcquired = lockAcquired;

boolean isLockAcquired = self.leaseStore.acquireInitializationLock(self.lockTime).block();

try {
if (!isLockAcquired) {
logger.info("Another instance is initializing the store");
try {
Thread.sleep(self.sleepTime.toMillis());
} catch (InterruptedException ex) {
logger.warn("Unexpected exception caught", ex);
}
continue;
}

logger.info("Initializing the store");
self.synchronizer.createMissingLeases().block();
self.leaseStore.markInitialized().block();

} catch (RuntimeException ex) {
break;
} finally {
if (isLockAcquired) {
self.leaseStore.releaseInitializationLock().block();
}
if (!this.isLockAcquired) {
logger.info("Another instance is initializing the store");
return Mono.just(isLockAcquired).delayElement(this.sleepTime);
} else {
return this.synchronizer.createMissingLeases()
.then(this.leaseStore.markInitialized());
}
})
.onErrorResume(throwable -> {
logger.warn("Unexpected exception caught", throwable);
return Mono.just(this.isLockAcquired);
})
.flatMap(lockAcquired -> {
if (this.isLockAcquired) {
return this.leaseStore.releaseInitializationLock();
}
return Mono.just(lockAcquired);
});
}
}

logger.info("The store is initialized");
});
})
.repeat( () -> !this.isInitialized)
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.azure.data.cosmos.SqlQuerySpec;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedContextClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand All @@ -33,6 +35,8 @@
* Implementation for ChangeFeedDocumentClient.
*/
public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
private final Logger logger = LoggerFactory.getLogger(ChangeFeedContextClientImpl.class);

private final AsyncDocumentClient documentClient;
private final CosmosContainer cosmosContainer;
private Scheduler rxScheduler;
Expand All @@ -41,8 +45,7 @@ public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
* Initializes a new instance of the {@link ChangeFeedContextClient} interface.
* @param cosmosContainer existing client.
*/
public ChangeFeedContextClientImpl(CosmosContainer cosmosContainer)
{
public ChangeFeedContextClientImpl(CosmosContainer cosmosContainer) {
if (cosmosContainer == null) {
throw new IllegalArgumentException("cosmosContainer");
}
Expand All @@ -57,8 +60,7 @@ public ChangeFeedContextClientImpl(CosmosContainer cosmosContainer)
* @param cosmosContainer existing client.
* @param rxScheduler the RX Java scheduler to observe on.
*/
public ChangeFeedContextClientImpl(CosmosContainer cosmosContainer, Scheduler rxScheduler)
{
public ChangeFeedContextClientImpl(CosmosContainer cosmosContainer, Scheduler rxScheduler) {
if (cosmosContainer == null) {
throw new IllegalArgumentException("cosmosContainer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public static class KeyValuePair<K, V> implements Map.Entry<K, V>
private K key;
private V value;

public KeyValuePair(K key, V value)
{
public KeyValuePair(K key, V value) {
this.key = key;
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public ChangeFeedObserverContextImpl(String leaseToken) {
this.feedResponse = null;
}

public ChangeFeedObserverContextImpl(String leaseToken, FeedResponse<CosmosItemProperties> feedResponse, PartitionCheckpointer checkpointer)
{
public ChangeFeedObserverContextImpl(String leaseToken, FeedResponse<CosmosItemProperties> feedResponse, PartitionCheckpointer checkpointer) {
this.partitionKeyRangeId = leaseToken;
this.feedResponse = feedResponse;
this.checkpointer = checkpointer;
Expand Down
Loading