Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.data.cosmos.internal.changefeed;

import com.azure.data.cosmos.CosmosItemProperties;
import reactor.core.publisher.Mono;

import java.util.List;

Expand Down Expand Up @@ -30,6 +31,7 @@ public interface ChangeFeedObserver {
*
* @param context the context specifying partition for this observer, etc.
* @param docs the documents changed.
* @return a deferred operation of this call.
*/
void processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs);
Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public class ObserverException extends RuntimeException {
/**
* Initializes a new instance of the {@link ObserverException} class using the specified internal exception.
*
* @param originalException {@link Exception} thrown by the user code.
* @param originalException {@link Throwable} thrown by the user code.
*/
public ObserverException(Exception originalException) {
super(DefaultMessage, originalException.getCause());
public ObserverException(Throwable originalException) {
Comment thread
milismsft marked this conversation as resolved.
super(DefaultMessage, originalException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverCloseReason;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext;
import com.azure.data.cosmos.internal.changefeed.CheckpointFrequency;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.ZoneId;
Expand Down Expand Up @@ -42,15 +43,22 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
}

@Override
public void processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
this.observer.processChanges(context, docs);
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
return this.observer.processChanges(context, docs)
.then(this.afterProcessChanges(context));
}

private Mono<Void> afterProcessChanges(ChangeFeedObserverContext context) {
Comment thread
milismsft marked this conversation as resolved.
this.processedDocCount ++;

if (this.isCheckpointNeeded()) {
context.checkpoint().block();
this.processedDocCount = 0;
this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC"));
return context.checkpoint()
.doOnSuccess((Void) -> {
this.processedDocCount = 0;
this.lastCheckpointTime = ZonedDateTime.now(ZoneId.of("UTC"));
});
}
return Mono.empty();
}

private boolean isCheckpointNeeded() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class BootstrapperImpl implements Bootstrapper {
private final Duration lockTime;
private final Duration sleepTime;

private boolean isInitialized;
private boolean isLockAcquired;

public BootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStore, Duration lockTime, Duration sleepTime)
{
if (synchronizer == null) throw new IllegalArgumentException("synchronizer");
Expand All @@ -32,45 +35,47 @@ public BootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStor
this.leaseStore = leaseStore;
this.lockTime = lockTime;
this.sleepTime = sleepTime;

this.isInitialized = false;
}

@Override
public Mono<Void> initialize() {
BootstrapperImpl self = this;

return Mono.fromRunnable( () -> {
while (true) {
boolean initialized = self.leaseStore.isInitialized().block();
this.isInitialized = false;

if (initialized) break;
return Mono.just(this)
.flatMap( value -> this.leaseStore.isInitialized())
.flatMap(initialized -> {
this.isInitialized = initialized;

boolean isLockAcquired = self.leaseStore.acquireInitializationLock(self.lockTime).block();
if (initialized) {
return Mono.empty();
} else {
return this.leaseStore.acquireInitializationLock(this.lockTime)
.flatMap(lockAcquired -> {
this.isLockAcquired = lockAcquired;
Comment thread
milismsft marked this conversation as resolved.

try {
if (!isLockAcquired) {
logger.info("Another instance is initializing the store");
try {
Thread.sleep(self.sleepTime.toMillis());
} catch (InterruptedException ex) {
logger.warn("Unexpected exception caught", ex);
}
continue;
}

logger.info("Initializing the store");
self.synchronizer.createMissingLeases().block();
self.leaseStore.markInitialized().block();

} catch (RuntimeException ex) {
break;
} finally {
if (isLockAcquired) {
self.leaseStore.releaseInitializationLock().block();
}
if (!this.isLockAcquired) {
logger.info("Another instance is initializing the store");
return Mono.just(isLockAcquired).delayElement(this.sleepTime);
} else {
return this.synchronizer.createMissingLeases()
.then(this.leaseStore.markInitialized());
}
})
.onErrorResume(throwable -> {
logger.warn("Unexpected exception caught", throwable);
return Mono.just(this.isLockAcquired);
})
.flatMap(lockAcquired -> {
if (this.isLockAcquired) {
return this.leaseStore.releaseInitializationLock();
}
return Mono.just(lockAcquired);
});
}
}

logger.info("The store is initialized");
});
})
.repeat( () -> !this.isInitialized)
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.azure.data.cosmos.SqlQuerySpec;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedContextClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand All @@ -33,6 +35,8 @@
* Implementation for ChangeFeedDocumentClient.
*/
public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
private final Logger logger = LoggerFactory.getLogger(ChangeFeedContextClientImpl.class);
Comment thread
milismsft marked this conversation as resolved.

private final AsyncDocumentClient documentClient;
private final CosmosContainer cosmosContainer;
private Scheduler rxScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,18 @@ public class ChangeFeedProcessorBuilderImpl implements ChangeFeedProcessor.Build
*/
@Override
public Mono<Void> start() {
return partitionManager.start();
if (this.partitionManager == null) {
return this.initializeCollectionPropertiesForBuild()
.then(this.getLeaseStoreManager()
.flatMap(leaseStoreManager -> this.buildPartitionManager(leaseStoreManager)))
.flatMap(partitionManager1 -> {
this.partitionManager = partitionManager1;
return this.partitionManager.start();
});

} else {
return partitionManager.start();
}
}

/**
Expand Down Expand Up @@ -278,8 +289,6 @@ public ChangeFeedProcessorBuilderImpl withHealthMonitor(HealthMonitor healthMoni
*/
@Override
public ChangeFeedProcessor build() {
ChangeFeedProcessorBuilderImpl self = this;

if (this.hostName == null)
{
throw new IllegalArgumentException("Host name was not specified");
Expand All @@ -294,13 +303,7 @@ public ChangeFeedProcessor build() {
this.executorService = Executors.newCachedThreadPool();
}

// TBD: Move this initialization code as part of the start() call.
return this.initializeCollectionPropertiesForBuild()
.then(self.getLeaseStoreManager().flatMap(leaseStoreManager -> self.buildPartitionManager(leaseStoreManager)))
.map(partitionManager1 -> {
self.partitionManager = partitionManager1;
return self;
}).block();
return this;
}

public ChangeFeedProcessorBuilderImpl() {
Expand All @@ -313,30 +316,26 @@ public ChangeFeedProcessorBuilderImpl(PartitionManager partitionManager) {
}

private Mono<Void> initializeCollectionPropertiesForBuild() {
ChangeFeedProcessorBuilderImpl self = this;

if (this.changeFeedProcessorOptions == null) {
this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
}

return this.feedContextClient
.readDatabase(this.feedContextClient.getDatabaseClient(), null)
.map( databaseResourceResponse -> {
self.databaseResourceId = databaseResourceResponse.database().id();
return self.databaseResourceId;
this.databaseResourceId = databaseResourceResponse.database().id();
Comment thread
milismsft marked this conversation as resolved.
return this.databaseResourceId;
})
.flatMap( id -> self.feedContextClient
.readContainer(self.feedContextClient.getContainerClient(), null)
.flatMap( id -> this.feedContextClient
.readContainer(this.feedContextClient.getContainerClient(), null)
.map(documentCollectionResourceResponse -> {
self.collectionResourceId = documentCollectionResourceResponse.container().id();
return self.collectionResourceId;
this.collectionResourceId = documentCollectionResourceResponse.container().id();
return this.collectionResourceId;
}))
.then();
}

private Mono<LeaseStoreManager> getLeaseStoreManager() {
ChangeFeedProcessorBuilderImpl self = this;

if (this.leaseStoreManager == null) {

return this.leaseContextClient.readContainerSettings(this.leaseContextClient.getContainerClient(), null)
Expand All @@ -352,18 +351,18 @@ private Mono<LeaseStoreManager> getLeaseStoreManager() {

RequestOptionsFactory requestOptionsFactory = new PartitionedByIdCollectionRequestOptionsFactory();

String leasePrefix = self.getLeasePrefix();
String leasePrefix = this.getLeasePrefix();

return LeaseStoreManager.Builder()
.leasePrefix(leasePrefix)
.leaseCollectionLink(self.leaseContextClient.getContainerClient())
.leaseContextClient(self.leaseContextClient)
.leaseCollectionLink(this.leaseContextClient.getContainerClient())
.leaseContextClient(this.leaseContextClient)
.requestOptionsFactory(requestOptionsFactory)
.hostName(self.hostName)
.hostName(this.hostName)
.build()
.map(manager -> {
self.leaseStoreManager = manager;
return self.leaseStoreManager;
this.leaseStoreManager = manager;
return this.leaseStoreManager;
});
});
}
Expand All @@ -389,8 +388,6 @@ private String getLeasePrefix() {
}

private Mono<PartitionManager> buildPartitionManager(LeaseStoreManager leaseStoreManager) {
ChangeFeedProcessorBuilderImpl self = this;

CheckpointerObserverFactory factory = new CheckpointerObserverFactory(this.observerFactory, new CheckpointFrequency());

PartitionSynchronizerImpl synchronizer = new PartitionSynchronizerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.function.Consumer;

class DefaultObserver implements ChangeFeedObserver {
private final Logger log = LoggerFactory.getLogger(DefaultObserver.class);
private static final Logger log = LoggerFactory.getLogger(DefaultObserver.class);
private Consumer<List<CosmosItemProperties>> consumer;
Comment thread
milismsft marked this conversation as resolved.
Outdated

public DefaultObserver(Consumer<List<CosmosItemProperties>> consumer) {
Expand All @@ -31,9 +32,11 @@ public void close(ChangeFeedObserverContext context, ChangeFeedObserverCloseReas
}

@Override
public void processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
public Mono<Void> processChanges(ChangeFeedObserverContext context, List<CosmosItemProperties> docs) {
log.info("Start processing from thread {}", Thread.currentThread().getId());
consumer.accept(docs);
log.info("Done processing from thread {}", Thread.currentThread().getId());

return Mono.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,10 @@ public Mono<Boolean> acquireInitializationLock(Duration lockExpirationTime) {
containerDocument.id(lockId);
BridgeInternal.setProperty(containerDocument, com.azure.data.cosmos.internal.Constants.Properties.TTL, Long.valueOf(lockExpirationTime.getSeconds()).intValue());

DocumentServiceLeaseStore self = this;

return this.client.createItem(this.leaseCollectionLink, containerDocument, null, false)
.map(documentResourceResponse -> {
if (documentResourceResponse.item() != null) {
self.lockETag = documentResourceResponse.properties().etag();
this.lockETag = documentResourceResponse.properties().etag();
Comment thread
milismsft marked this conversation as resolved.
return true;
} else {
return false;
Expand Down Expand Up @@ -130,13 +128,12 @@ public Mono<Boolean> releaseInitializationLock() {
accessCondition.type(AccessConditionType.IF_MATCH);
accessCondition.condition(this.lockETag);
requestOptions.accessCondition(accessCondition);
DocumentServiceLeaseStore self = this;

CosmosItem docItem = this.client.getContainerClient().getItem(lockId, "/id");
return this.client.deleteItem(docItem, requestOptions)
.map(documentResourceResponse -> {
if (documentResourceResponse.item() != null) {
self.lockETag = null;
this.lockETag = null;
return true;
} else {
return false;
Expand Down
Loading