Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.models.CosmosAsyncItemResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ThroughputProperties;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
Expand Down Expand Up @@ -259,7 +260,11 @@ private void createClients() {
} catch (CosmosException e) {
if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
cosmosAsyncContainer =
cosmosAsyncDatabase.createContainer(this.configuration.getCollectionId(), Configuration.DEFAULT_PARTITION_KEY_PATH, this.configuration.getThroughput()).block().getContainer();
cosmosAsyncDatabase.createContainer(
this.configuration.getCollectionId(),
Configuration.DEFAULT_PARTITION_KEY_PATH,
ThroughputProperties.createManualThroughput(this.configuration.getThroughput())
).block().getContainer();
logger.info("Collection {} is created for this test on host {}", this.configuration.getCollectionId(), endpoint);
if(!databaseCreated) {
collectionListToClear.add(cosmosAsyncContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.GatewayConnectionConfig;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.models.ThroughputProperties;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Meter;
Expand Down Expand Up @@ -108,7 +109,7 @@ abstract class AsyncBenchmark<T> {
cosmosAsyncContainer = cosmosAsyncDatabase.createContainer(
this.configuration.getCollectionId(),
Configuration.DEFAULT_PARTITION_KEY_PATH,
this.configuration.getThroughput()
ThroughputProperties.createManualThroughput(this.configuration.getThroughput())
).block().getContainer();
logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
collectionCreated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.cosmos.GatewayConnectionConfig;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.ThroughputProperties;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
Expand Down Expand Up @@ -133,7 +134,10 @@ public T apply(T o, Throwable throwable) {
cosmosContainer = cosmosDatabase.getContainer(this.configuration.getCollectionId()).read().getContainer();
} catch (CosmosException e) {
if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
cosmosContainer = cosmosDatabase.createContainer(this.configuration.getCollectionId(), Configuration.DEFAULT_PARTITION_KEY_PATH, this.configuration.getThroughput()).getContainer();
cosmosContainer = cosmosDatabase.createContainer(this.configuration.getCollectionId(),
Configuration.DEFAULT_PARTITION_KEY_PATH,
ThroughputProperties.createManualThroughput(this.configuration.getThroughput()))
.getContainer();
logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
collectionCreated = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.implementation.CosmosItemProperties;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -160,7 +161,7 @@ public static CosmosAsyncContainer createNewCollection(CosmosAsyncClient client,

CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

containerResponse = databaseLink.createContainer(containerSettings, 10000, requestOptions).block();
containerResponse = databaseLink.createContainer(containerSettings, ThroughputProperties.createManualThroughput(10000), requestOptions).block();

if (containerResponse == null) {
throw new RuntimeException(String.format("Failed to create collection %s in database %s.", collectionName, databaseName));
Expand Down Expand Up @@ -201,7 +202,7 @@ public static CosmosAsyncContainer createNewLeaseCollection(CosmosAsyncClient cl
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

leaseContainerResponse = databaseLink.createContainer(containerSettings, 400,requestOptions).block();
leaseContainerResponse = databaseLink.createContainer(containerSettings, ThroughputProperties.createManualThroughput(400),requestOptions).block();

if (leaseContainerResponse == null) {
throw new RuntimeException(String.format("Failed to create collection %s in database %s.", leaseCollectionName, databaseName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,41 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedProcessorBuilderImpl;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* Simple host for distributing change feed events across observers and thus allowing these observers scale.
* It distributes the load across its instances and allows dynamic scaling:
* - Partitions in partitioned collections are distributed across instances/observers.
* - New instance takes leases from existing instances to make distribution equal.
* - If an instance dies, the leases are distributed across remaining instances.
* It's useful for scenario when partition count is high so that one host/VM is not capable of processing that many
* change feed events.
* Client application needs to implement {@link ChangeFeedObserver} and register processor implementation with
* {@link ChangeFeedProcessor}.
* Simple host for distributing change feed events across observers, simplifying the process of reading the change feeds
* and distributing the processing events across multiple consumers effectively.
* <p>
* It uses auxiliary document collection for managing leases for a partition.
* Every EventProcessorHost instance is performing the following two tasks:
* 1) Renew Leases: It keeps track of leases currently owned by the host and continuously keeps on renewing the leases.
* 2) Acquire Leases: Each instance continuously polls all leases to check if there are any leases it should acquire
* for the system to get into balanced state.
* There are four main components of implementing the change feed processor:
* - The monitored container: the monitored container has the data from which the change feed is generated. Any inserts
* and updates to the monitored container are reflected in the change feed of the container.
* - The lease container: the lease container acts as a state storage and coordinates processing the change feed across
* multiple workers. The lease container can be stored in the same account as the monitored container or in a
* separate account.
* - The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple
* instances with the same lease configuration can run in parallel, but each instance should have a different
* instance name.
* - The delegate: the delegate is the code that defines what you, the developer, want to do with each batch of
* changes that the change feed processor reads.
* <p>
* {@code
* ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder()
* .hostName(hostName)
* .feedContainer(feedContainer)
* .leaseContainer(leaseContainer)
* .handleChanges(docs -> {
* // Implementation for handling and processing CosmosItemProperties list goes here
* })
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* .build();
* }
*/
Expand Down Expand Up @@ -63,17 +64,31 @@ public interface ChangeFeedProcessor {
boolean isStarted();

/**
* Helper static method to buildAsyncClient {@link ChangeFeedProcessor} instances
* as logical representation of the Azure Cosmos DB database service.
* Returns the current owner (host) and an approximation of the difference between the last processed item (defined
* by the state of the feed container) and the latest change in the container for each partition (lease
* document).
* <p>
* An empty map will be returned if the processor was not started or no lease documents matching the current
* {@link ChangeFeedProcessor} instance's lease prefix could be found.
*
* @return a map representing the current owner and lease token, the current LSN and latest LSN, and the estimated
* lag, asynchronously.
*/
Mono<Map<String, Integer>> getEstimatedLag();

/**
* Helper static method to build a {@link ChangeFeedProcessor} instance.
* <p>
* {@code
* ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder()
* .hostName(hostName)
* .feedContainer(feedContainer)
* .leaseContainer(leaseContainer)
* .handleChanges(docs -> {
* // Implementation for handling and processing CosmosItemProperties list goes here
* })
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* .build();
* }
* @return a changeFeedProcessorBuilder definition instance.
Expand Down Expand Up @@ -121,8 +136,17 @@ interface BuilderDefinition {

/**
* Sets a consumer function which will be called to process changes.
* <p>
* {@code
* An example for how this will look like:
* .handleChanges(docs -> {
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* }
*
* @param consumer the consumer of {@link ChangeFeedObserver} to call for handling the feeds.
* @param consumer the {@link Consumer} to call for handling the feeds.
* @return current Builder.
*/
BuilderDefinition handleChanges(Consumer<List<JsonNode>> consumer);
Expand All @@ -136,7 +160,7 @@ interface BuilderDefinition {
BuilderDefinition leaseContainer(CosmosAsyncContainer leaseContainer);

/**
* Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration asynchronously.
* Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration.
*
* @return an instance of {@link ChangeFeedProcessor}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.Map;

/**
* Represents the consistency levels supported for Cosmos DB client operations in the Azure Cosmos DB database service.
* Represents the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service.
* <p>
* The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency
* levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL.
Expand Down
Loading