Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,41 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedProcessorBuilderImpl;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* Simple host for distributing change feed events across observers and thus allowing these observers scale.
* It distributes the load across its instances and allows dynamic scaling:
* - Partitions in partitioned collections are distributed across instances/observers.
* - New instance takes leases from existing instances to make distribution equal.
* - If an instance dies, the leases are distributed across remaining instances.
* It's useful for scenario when partition count is high so that one host/VM is not capable of processing that many
* change feed events.
* Client application needs to implement {@link ChangeFeedObserver} and register processor implementation with
* {@link ChangeFeedProcessor}.
* Simple host for distributing change feed events across observers, simplifying the process of reading the change feeds
* and distributing the processing events across multiple consumers effectively.
* <p>
* It uses auxiliary document collection for managing leases for a partition.
* Every EventProcessorHost instance is performing the following two tasks:
* 1) Renew Leases: It keeps track of leases currently owned by the host and continuously keeps on renewing the leases.
* 2) Acquire Leases: Each instance continuously polls all leases to check if there are any leases it should acquire
* for the system to get into balanced state.
* There are four main components of implementing the change feed processor:
* - The monitored container: the monitored container has the data from which the change feed is generated. Any inserts
* and updates to the monitored container are reflected in the change feed of the container.
* - The lease container: the lease container acts as a state storage and coordinates processing the change feed across
* multiple workers. The lease container can be stored in the same account as the monitored container or in a
* separate account.
* - The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple
* instances with the same lease configuration can run in parallel, but each instance should have a different
* instance name.
* - The delegate: the delegate is the code that defines what you, the developer, want to do with each batch of
* changes that the change feed processor reads.
* <p>
* {@code
* ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder()
* .hostName(hostName)
* .feedContainer(feedContainer)
* .leaseContainer(leaseContainer)
* .handleChanges(docs -> {
* // Implementation for handling and processing CosmosItemProperties list goes here
* })
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* .build();
* }
*/
Expand Down Expand Up @@ -63,17 +64,31 @@ public interface ChangeFeedProcessor {
boolean isStarted();

/**
* Helper static method to buildAsyncClient {@link ChangeFeedProcessor} instances
* as logical representation of the Azure Cosmos DB database service.
* Returns the current owner (host) and an approximation of the difference between the last processed item (defined
* by the state of the feed container) and the latest change in the container for each partition (lease
* document).
* <p>
* An empty map will be returned if the processor was not started or no lease documents matching the current
* {@link ChangeFeedProcessor} instance's lease prefix could be found.
*
* @return a map representing the current owner and lease token, the current LSN and latest LSN, and the estimated
* lag, asynchronously.
*/
Mono<Map<String, Integer>> getEstimatedLag();

/**
* Helper static method to build a {@link ChangeFeedProcessor} instance.
* <p>
* {@code
* ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder()
* .hostName(hostName)
* .feedContainer(feedContainer)
* .leaseContainer(leaseContainer)
* .handleChanges(docs -> {
* // Implementation for handling and processing CosmosItemProperties list goes here
* })
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* .build();
* }
* @return a changeFeedProcessorBuilder definition instance.
Expand Down Expand Up @@ -121,8 +136,17 @@ interface BuilderDefinition {

/**
* Sets a consumer function which will be called to process changes.
* <p>
* {@code
* An example for how this will look like:
* .handleChanges(docs -> {
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* }
*
* @param consumer the consumer of {@link ChangeFeedObserver} to call for handling the feeds.
* @param consumer the {@link Consumer} to call for handling the feeds.
* @return current Builder.
*/
BuilderDefinition handleChanges(Consumer<List<JsonNode>> consumer);
Expand All @@ -136,7 +160,7 @@ interface BuilderDefinition {
BuilderDefinition leaseContainer(CosmosAsyncContainer leaseContainer);

/**
* Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration asynchronously.
* Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration.
*
* @return an instance of {@link ChangeFeedProcessor}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,54 +85,54 @@ AsyncDocumentClient getContextClient() {
}

/**
* Monitor Cosmos client performance and resource utilization using the specified meter registry
* Monitor Cosmos client performance and resource utilization using the specified meter registry.
*
* @param registry meter registry to use for performance monitoring
* @param registry meter registry to use for performance monitoring.
*/
static void setMonitorTelemetry(MeterRegistry registry) {
RntbdMetrics.add(registry);
}

/**
* Get the service endpoint
* Get the service endpoint.
*
* @return the service endpoint
* @return the service endpoint.
*/
String getServiceEndpoint() {
return serviceEndpoint;
}

/**
* Gets the key or resource token
* Gets the key or resource token.
*
* @return get the key or resource token
* @return get the key or resource token.
*/
String getKeyOrResourceToken() {
return keyOrResourceToken;
}

/**
* Get the connection policy
* Get the connection policy.
*
* @return {@link ConnectionPolicy}
* @return {@link ConnectionPolicy}.
*/
ConnectionPolicy getConnectionPolicy() {
return connectionPolicy;
}

/**
* Gets the consistency level
* Gets the consistency level.
*
* @return the (@link ConsistencyLevel)
* @return the (@link ConsistencyLevel).
*/
ConsistencyLevel getDesiredConsistencyLevel() {
return desiredConsistencyLevel;
}

/**
* Gets the permission list
* Gets the permission list.
*
* @return the permission list
* @return the permission list.
*/
List<CosmosPermissionProperties> getPermissions() {
return permissions;
Expand All @@ -143,27 +143,27 @@ AsyncDocumentClient getDocClientWrapper() {
}

/**
* Gets the configs
* Gets the configs.
*
* @return the configs
* @return the configs.
*/
Configs getConfigs() {
return configs;
}

/**
* Gets the token resolver
* Gets the token resolver.
*
* @return the token resolver
* @return the token resolver.
*/
CosmosAuthorizationTokenResolver getCosmosAuthorizationTokenResolver() {
return cosmosAuthorizationTokenResolver;
}

/**
* Gets the azure key credential
* Gets the azure key credential.
*
* @return azure key credential
* @return azure key credential.
*/
AzureKeyCredential credential() {
return credential;
Expand All @@ -179,19 +179,19 @@ AzureKeyCredential credential() {
*
* By-default, this is false.
*
* @return a boolean indicating whether resource will be included in the response or not
* @return a boolean indicating whether resource will be included in the response or not.
*/
boolean isContentResponseOnWriteEnabled() {
return contentResponseOnWriteEnabled;
}

/**
* CREATE a Database if it does not already exist on the service
* CREATE a Database if it does not already exist on the service.
* <p>
* The {@link Mono} upon successful completion will contain a single cosmos database response with the
* created or existing database.
*
* @param databaseSettings CosmosDatabaseProperties
* @param databaseSettings CosmosDatabaseProperties.
* @return a {@link Mono} containing the cosmos database response with the created or existing database or
* an error.
*/
Expand All @@ -200,13 +200,14 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExists(CosmosDatabas
}

/**
* CREATE a Database if it does not already exist on the service
* Create a Database if it does not already exist on the service.
* <p>
* The {@link Mono} upon successful completion will contain a single cosmos database response with the
* created or existing database.
*
* @param id the id of the database
* @param id the id of the database.
* @return a {@link Mono} containing the cosmos database response with the created or existing database or
* an error
* an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExists(String id) {
return createDatabaseIfNotExistsInternal(getDatabase(id));
Expand All @@ -226,6 +227,32 @@ private Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExistsInternal(Cosm
});
}

/**
* Create a Database if it does not already exist on the service.
* <p>
* The {@link Mono} upon successful completion will contain a single cosmos database response with the
* created or existing database.
*
* @param id the id.
* @param throughputProperties the throughputProperties.
* @return the mono.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) {
return this.getDatabase(id).read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setOfferProperties(options, throughputProperties);
return createDatabase(new CosmosDatabaseProperties(id),
options);
}
}
return Mono.error(unwrappedException);
});
}

/**
* Creates a database.
* <p>
Expand All @@ -234,8 +261,8 @@ private Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExistsInternal(Cosm
* created database.
* In case of failure the {@link Mono} will error.
*
* @param databaseSettings {@link CosmosDatabaseProperties}
* @param options {@link CosmosDatabaseRequestOptions}
* @param databaseSettings {@link CosmosDatabaseProperties}.
* @param options {@link CosmosDatabaseRequestOptions}.
* @return an {@link Mono} containing the single cosmos database response with the created database or an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseSettings,
Expand All @@ -259,7 +286,7 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties
* created database.
* In case of failure the {@link Mono} will error.
*
* @param databaseSettings {@link CosmosDatabaseProperties}
* @param databaseSettings {@link CosmosDatabaseProperties}.
* @return an {@link Mono} containing the single cosmos database response with the created database or an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseSettings) {
Expand All @@ -274,7 +301,7 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties
* created database.
* In case of failure the {@link Mono} will error.
*
* @param id id of the database
* @param id id of the database.
* @return a {@link Mono} containing the single cosmos database response with the created database or an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id) {
Expand All @@ -289,9 +316,9 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id) {
* created database.
* In case of failure the {@link Mono} will error.
*
* @param databaseSettings {@link CosmosDatabaseProperties}
* @param throughput the throughput for the database
* @param options {@link CosmosDatabaseRequestOptions}
* @param databaseSettings {@link CosmosDatabaseProperties}.
* @param throughput the throughput for the database.
* @param options {@link CosmosDatabaseRequestOptions}.
* @return an {@link Mono} containing the single cosmos database response with the created database or an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseSettings,
Expand All @@ -317,8 +344,8 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties
* created database.
* In case of failure the {@link Mono} will error.
*
* @param databaseSettings {@link CosmosDatabaseProperties}
* @param throughput the throughput for the database
* @param databaseSettings {@link CosmosDatabaseProperties}.
* @param throughput the throughput for the database.
* @return an {@link Mono} containing the single cosmos database response with the created database or an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseSettings, int throughput) {
Expand All @@ -335,8 +362,8 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(CosmosDatabaseProperties
* created database.
* In case of failure the {@link Mono} will error.
*
* @param id id of the database
* @param throughput the throughput for the database
* @param id id of the database.
* @param throughput the throughput for the database.
* @return a {@link Mono} containing the single cosmos database response with the created database or an error.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id, int throughput) {
Expand All @@ -348,9 +375,9 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id, int throughpu
/**
* Creates a database.
*
* @param id the id
* @param throughputProperties the throughputProperties
* @return the mono
* @param id the id.
* @param throughputProperties the throughputProperties.
* @return the mono.
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id, ThroughputProperties throughputProperties) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
Expand Down Expand Up @@ -432,8 +459,8 @@ public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(SqlQuerySpec que
/**
* Gets a database object without making a service call.
*
* @param id name of the database
* @return {@link CosmosAsyncDatabase}
* @param id name of the database.
* @return {@link CosmosAsyncDatabase}.
*/
public CosmosAsyncDatabase getDatabase(String id) {
return new CosmosAsyncDatabase(id, this);
Expand Down
Loading