diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsynReadWithMultipleClients.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsynReadWithMultipleClients.java index 584bd8a16d55..6edebd20e2f9 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsynReadWithMultipleClients.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsynReadWithMultipleClients.java @@ -15,6 +15,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.models.CosmosAsyncItemResponse; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.ThroughputProperties; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricFilter; @@ -259,7 +260,11 @@ private void createClients() { } catch (CosmosException e) { if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { cosmosAsyncContainer = - cosmosAsyncDatabase.createContainer(this.configuration.getCollectionId(), Configuration.DEFAULT_PARTITION_KEY_PATH, this.configuration.getThroughput()).block().getContainer(); + cosmosAsyncDatabase.createContainer( + this.configuration.getCollectionId(), + Configuration.DEFAULT_PARTITION_KEY_PATH, + ThroughputProperties.createManualThroughput(this.configuration.getThroughput()) + ).block().getContainer(); logger.info("Collection {} is created for this test on host {}", this.configuration.getCollectionId(), endpoint); if(!databaseCreated) { collectionListToClear.add(cosmosAsyncContainer); diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index 61829e49a1bd..d4925db34ac0 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -13,6 +13,7 @@ import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.GatewayConnectionConfig; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.ThroughputProperties; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.CsvReporter; import com.codahale.metrics.Meter; @@ -108,7 +109,7 @@ abstract class AsyncBenchmark { cosmosAsyncContainer = cosmosAsyncDatabase.createContainer( this.configuration.getCollectionId(), Configuration.DEFAULT_PARTITION_KEY_PATH, - this.configuration.getThroughput() + ThroughputProperties.createManualThroughput(this.configuration.getThroughput()) ).block().getContainer(); logger.info("Collection {} is created for this test", this.configuration.getCollectionId()); collectionCreated = true; diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java index b117b215017b..caa626b8a3a6 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java @@ -14,6 +14,7 @@ import com.azure.cosmos.GatewayConnectionConfig; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.ThroughputProperties; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricFilter; @@ -133,7 +134,10 @@ public T apply(T o, Throwable throwable) { cosmosContainer = cosmosDatabase.getContainer(this.configuration.getCollectionId()).read().getContainer(); } catch (CosmosException e) { if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - cosmosContainer = cosmosDatabase.createContainer(this.configuration.getCollectionId(), Configuration.DEFAULT_PARTITION_KEY_PATH, this.configuration.getThroughput()).getContainer(); + cosmosContainer = cosmosDatabase.createContainer(this.configuration.getCollectionId(), + Configuration.DEFAULT_PARTITION_KEY_PATH, + ThroughputProperties.createManualThroughput(this.configuration.getThroughput())) + .getContainer(); logger.info("Collection {} is created for this test", this.configuration.getCollectionId()); collectionCreated = true; } else { diff --git a/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/ChangeFeed/SampleChangeFeedProcessor.java b/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/ChangeFeed/SampleChangeFeedProcessor.java index a06e2630c45f..9016afd3219d 100644 --- a/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/ChangeFeed/SampleChangeFeedProcessor.java +++ b/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/ChangeFeed/SampleChangeFeedProcessor.java @@ -15,6 +15,7 @@ import com.azure.cosmos.implementation.CosmosItemProperties; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.models.ThroughputProperties; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -160,7 +161,7 @@ public static CosmosAsyncContainer createNewCollection(CosmosAsyncClient client, CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions(); - containerResponse = databaseLink.createContainer(containerSettings, 10000, requestOptions).block(); + containerResponse = databaseLink.createContainer(containerSettings, ThroughputProperties.createManualThroughput(10000), requestOptions).block(); if (containerResponse == null) { throw new RuntimeException(String.format("Failed to create collection %s in database %s.", collectionName, databaseName)); @@ -201,7 +202,7 @@ public static CosmosAsyncContainer createNewLeaseCollection(CosmosAsyncClient cl CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id"); CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions(); - leaseContainerResponse = databaseLink.createContainer(containerSettings, 400,requestOptions).block(); + leaseContainerResponse = databaseLink.createContainer(containerSettings, ThroughputProperties.createManualThroughput(400),requestOptions).block(); if (leaseContainerResponse == null) { throw new RuntimeException(String.format("Failed to create collection %s in database %s.", leaseCollectionName, databaseName)); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessor.java index 269c1d1f620f..9a8c63fa0656 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessor.java @@ -2,31 +2,30 @@ // Licensed under the MIT License. package com.azure.cosmos; -import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver; import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedProcessorBuilderImpl; import com.azure.cosmos.models.ChangeFeedProcessorOptions; import com.fasterxml.jackson.databind.JsonNode; import reactor.core.publisher.Mono; import java.util.List; +import java.util.Map; import java.util.function.Consumer; /** - * Simple host for distributing change feed events across observers and thus allowing these observers scale. - * It distributes the load across its instances and allows dynamic scaling: - * - Partitions in partitioned collections are distributed across instances/observers. - * - New instance takes leases from existing instances to make distribution equal. - * - If an instance dies, the leases are distributed across remaining instances. - * It's useful for scenario when partition count is high so that one host/VM is not capable of processing that many - * change feed events. - * Client application needs to implement {@link ChangeFeedObserver} and register processor implementation with - * {@link ChangeFeedProcessor}. + * Simple host for distributing change feed events across observers, simplifying the process of reading the change feeds + * and distributing the processing events across multiple consumers effectively. *

- * It uses auxiliary document collection for managing leases for a partition. - * Every EventProcessorHost instance is performing the following two tasks: - * 1) Renew Leases: It keeps track of leases currently owned by the host and continuously keeps on renewing the leases. - * 2) Acquire Leases: Each instance continuously polls all leases to check if there are any leases it should acquire - * for the system to get into balanced state. + * There are four main components of implementing the change feed processor: + * - The monitored container: the monitored container has the data from which the change feed is generated. Any inserts + * and updates to the monitored container are reflected in the change feed of the container. + * - The lease container: the lease container acts as a state storage and coordinates processing the change feed across + * multiple workers. The lease container can be stored in the same account as the monitored container or in a + * separate account. + * - The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple + * instances with the same lease configuration can run in parallel, but each instance should have a different + * instance name. + * - The delegate: the delegate is the code that defines what you, the developer, want to do with each batch of + * changes that the change feed processor reads. *

* {@code * ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder() @@ -34,8 +33,10 @@ * .feedContainer(feedContainer) * .leaseContainer(leaseContainer) * .handleChanges(docs -> { - * // Implementation for handling and processing CosmosItemProperties list goes here - * }) + * for (JsonNode item : docs) { + * // Implementation for handling and processing of each JsonNode item goes here + * } + * }) * .build(); * } */ @@ -63,8 +64,20 @@ public interface ChangeFeedProcessor { boolean isStarted(); /** - * Helper static method to buildAsyncClient {@link ChangeFeedProcessor} instances - * as logical representation of the Azure Cosmos DB database service. + * Returns the current owner (host) and an approximation of the difference between the last processed item (defined + * by the state of the feed container) and the latest change in the container for each partition (lease + * document). + *

+ * An empty map will be returned if the processor was not started or no lease documents matching the current + * {@link ChangeFeedProcessor} instance's lease prefix could be found. + * + * @return a map representing the current owner and lease token, the current LSN and latest LSN, and the estimated + * lag, asynchronously. + */ + Mono> getEstimatedLag(); + + /** + * Helper static method to build a {@link ChangeFeedProcessor} instance. *

* {@code * ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder() @@ -72,8 +85,10 @@ public interface ChangeFeedProcessor { * .feedContainer(feedContainer) * .leaseContainer(leaseContainer) * .handleChanges(docs -> { - * // Implementation for handling and processing CosmosItemProperties list goes here - * }) + * for (JsonNode item : docs) { + * // Implementation for handling and processing of each JsonNode item goes here + * } + * }) * .build(); * } * @return a changeFeedProcessorBuilder definition instance. @@ -121,8 +136,17 @@ interface BuilderDefinition { /** * Sets a consumer function which will be called to process changes. + *

+ * {@code + * An example for how this will look like: + * .handleChanges(docs -> { + * for (JsonNode item : docs) { + * // Implementation for handling and processing of each JsonNode item goes here + * } + * }) + * } * - * @param consumer the consumer of {@link ChangeFeedObserver} to call for handling the feeds. + * @param consumer the {@link Consumer} to call for handling the feeds. * @return current Builder. */ BuilderDefinition handleChanges(Consumer> consumer); @@ -136,7 +160,7 @@ interface BuilderDefinition { BuilderDefinition leaseContainer(CosmosAsyncContainer leaseContainer); /** - * Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration asynchronously. + * Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration. * * @return an instance of {@link ChangeFeedProcessor}. */ diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ConsistencyLevel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ConsistencyLevel.java index 4131d2df7f3c..63f53132c4a6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ConsistencyLevel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ConsistencyLevel.java @@ -7,7 +7,7 @@ import java.util.Map; /** - * Represents the consistency levels supported for Cosmos DB client operations in the Azure Cosmos DB database service. + * Represents the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. *

* The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency * levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java index bcdcb5febc70..92c076a4acb9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java @@ -31,9 +31,8 @@ import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** - * Provides a client-side logical representation of the Azure Cosmos database service. - * This asynchronous client is used to configure and execute requests - * against the service. + * Provides a client-side logical representation of the Azure Cosmos DB service. + * This asynchronous client is used to configure and execute requests against the service. */ @ServiceClient( builder = CosmosClientBuilder.class, @@ -85,54 +84,54 @@ AsyncDocumentClient getContextClient() { } /** - * Monitor Cosmos client performance and resource utilization using the specified meter registry + * Monitor Cosmos client performance and resource utilization using the specified meter registry. * - * @param registry meter registry to use for performance monitoring + * @param registry meter registry to use for performance monitoring. */ static void setMonitorTelemetry(MeterRegistry registry) { RntbdMetrics.add(registry); } /** - * Get the service endpoint + * Get the service endpoint. * - * @return the service endpoint + * @return the service endpoint. */ String getServiceEndpoint() { return serviceEndpoint; } /** - * Gets the key or resource token + * Gets the key or resource token. * - * @return get the key or resource token + * @return get the key or resource token. */ String getKeyOrResourceToken() { return keyOrResourceToken; } /** - * Get the connection policy + * Get the connection policy. * - * @return {@link ConnectionPolicy} + * @return {@link ConnectionPolicy}. */ ConnectionPolicy getConnectionPolicy() { return connectionPolicy; } /** - * Gets the consistency level + * Gets the consistency level. * - * @return the (@link ConsistencyLevel) + * @return the {@link ConsistencyLevel}. */ ConsistencyLevel getDesiredConsistencyLevel() { return desiredConsistencyLevel; } /** - * Gets the permission list + * Gets the permission list. * - * @return the permission list + * @return the permission list. */ List getPermissions() { return permissions; @@ -143,27 +142,27 @@ AsyncDocumentClient getDocClientWrapper() { } /** - * Gets the configs + * Gets the configs. * - * @return the configs + * @return the configs. */ Configs getConfigs() { return configs; } /** - * Gets the token resolver + * Gets the token resolver. * - * @return the token resolver + * @return the token resolver. */ CosmosAuthorizationTokenResolver getCosmosAuthorizationTokenResolver() { return cosmosAuthorizationTokenResolver; } /** - * Gets the azure key credential + * Gets the azure key credential. * - * @return azure key credential + * @return azure key credential. */ AzureKeyCredential credential() { return credential; @@ -179,19 +178,19 @@ AzureKeyCredential credential() { * * By-default, this is false. * - * @return a boolean indicating whether resource will be included in the response or not + * @return a boolean indicating whether resource will be included in the response or not. */ boolean isContentResponseOnWriteEnabled() { return contentResponseOnWriteEnabled; } /** - * CREATE a Database if it does not already exist on the service + * CREATE a Database if it does not already exist on the service. *

* The {@link Mono} upon successful completion will contain a single cosmos database response with the * created or existing database. * - * @param databaseSettings CosmosDatabaseProperties + * @param databaseSettings CosmosDatabaseProperties. * @return a {@link Mono} containing the cosmos database response with the created or existing database or * an error. */ @@ -200,13 +199,14 @@ public Mono createDatabaseIfNotExists(CosmosDatabas } /** - * CREATE a Database if it does not already exist on the service + * Create a Database if it does not already exist on the service. + *

* The {@link Mono} upon successful completion will contain a single cosmos database response with the * created or existing database. * - * @param id the id of the database + * @param id the id of the database. * @return a {@link Mono} containing the cosmos database response with the created or existing database or - * an error + * an error. */ public Mono createDatabaseIfNotExists(String id) { return createDatabaseIfNotExistsInternal(getDatabase(id)); @@ -226,6 +226,32 @@ private Mono createDatabaseIfNotExistsInternal(Cosm }); } + /** + * Create a Database if it does not already exist on the service. + *

+ * The {@link Mono} upon successful completion will contain a single cosmos database response with the + * created or existing database. + * + * @param id the id. + * @param throughputProperties the throughputProperties. + * @return the mono. + */ + public Mono createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) { + return this.getDatabase(id).read().onErrorResume(exception -> { + final Throwable unwrappedException = Exceptions.unwrap(exception); + if (unwrappedException instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) unwrappedException; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); + return createDatabase(new CosmosDatabaseProperties(id), + options); + } + } + return Mono.error(unwrappedException); + }); + } + /** * Creates a database. *

@@ -234,8 +260,8 @@ private Mono createDatabaseIfNotExistsInternal(Cosm * created database. * In case of failure the {@link Mono} will error. * - * @param databaseSettings {@link CosmosDatabaseProperties} - * @param options {@link CosmosDatabaseRequestOptions} + * @param databaseSettings {@link CosmosDatabaseProperties}. + * @param options {@link CosmosDatabaseRequestOptions}. * @return an {@link Mono} containing the single cosmos database response with the created database or an error. */ public Mono createDatabase(CosmosDatabaseProperties databaseSettings, @@ -259,7 +285,7 @@ public Mono createDatabase(CosmosDatabaseProperties * created database. * In case of failure the {@link Mono} will error. * - * @param databaseSettings {@link CosmosDatabaseProperties} + * @param databaseSettings {@link CosmosDatabaseProperties}. * @return an {@link Mono} containing the single cosmos database response with the created database or an error. */ public Mono createDatabase(CosmosDatabaseProperties databaseSettings) { @@ -274,7 +300,7 @@ public Mono createDatabase(CosmosDatabaseProperties * created database. * In case of failure the {@link Mono} will error. * - * @param id id of the database + * @param id id of the database. * @return a {@link Mono} containing the single cosmos database response with the created database or an error. */ public Mono createDatabase(String id) { @@ -289,18 +315,18 @@ public Mono createDatabase(String id) { * created database. * In case of failure the {@link Mono} will error. * - * @param databaseSettings {@link CosmosDatabaseProperties} - * @param throughput the throughput for the database - * @param options {@link CosmosDatabaseRequestOptions} + * @param databaseSettings {@link CosmosDatabaseProperties}. + * @param throughputProperties the throughput properties for the database. + * @param options {@link CosmosDatabaseRequestOptions}. * @return an {@link Mono} containing the single cosmos database response with the created database or an error. */ public Mono createDatabase(CosmosDatabaseProperties databaseSettings, - int throughput, + ThroughputProperties throughputProperties, CosmosDatabaseRequestOptions options) { if (options == null) { options = new CosmosDatabaseRequestOptions(); } - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); Database wrappedDatabase = new Database(); wrappedDatabase.setId(databaseSettings.getId()); return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options)) @@ -317,13 +343,13 @@ public Mono createDatabase(CosmosDatabaseProperties * created database. * In case of failure the {@link Mono} will error. * - * @param databaseSettings {@link CosmosDatabaseProperties} - * @param throughput the throughput for the database + * @param databaseSettings {@link CosmosDatabaseProperties}. + * @param throughputProperties the throughput properties for the database. * @return an {@link Mono} containing the single cosmos database response with the created database or an error. */ - public Mono createDatabase(CosmosDatabaseProperties databaseSettings, int throughput) { + public Mono createDatabase(CosmosDatabaseProperties databaseSettings, ThroughputProperties throughputProperties) { CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); return createDatabase(databaseSettings, options); } @@ -335,26 +361,26 @@ public Mono createDatabase(CosmosDatabaseProperties * created database. * In case of failure the {@link Mono} will error. * - * @param id id of the database - * @param throughput the throughput for the database + * @param id id of the database. + * @param throughput the throughput for the database. * @return a {@link Mono} containing the single cosmos database response with the created database or an error. */ - public Mono createDatabase(String id, int throughput) { + Mono createDatabase(String id, int throughput) { CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); return createDatabase(new CosmosDatabaseProperties(id), options); } /** * Creates a database. * - * @param id the id - * @param throughputProperties the throughputProperties - * @return the mono + * @param id the id. + * @param throughputProperties the throughputProperties. + * @return the mono. */ public Mono createDatabase(String id, ThroughputProperties throughputProperties) { CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); - ModelBridgeInternal.setOfferProperties(options, throughputProperties); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); return createDatabase(new CosmosDatabaseProperties(id), options); } @@ -432,8 +458,8 @@ public CosmosPagedFlux queryDatabases(SqlQuerySpec que /** * Gets a database object without making a service call. * - * @param id name of the database - * @return {@link CosmosAsyncDatabase} + * @param id name of the database. + * @return {@link CosmosAsyncDatabase}. */ public CosmosAsyncDatabase getDatabase(String id) { return new CosmosAsyncDatabase(id, this); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index 314ae55a9623..4fd6781a4900 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -48,22 +48,22 @@ public class CosmosAsyncContainer { } /** - * Get the id of the {@link CosmosAsyncContainer} + * Get the id of the {@link CosmosAsyncContainer}. * - * @return the id of the {@link CosmosAsyncContainer} + * @return the id of the {@link CosmosAsyncContainer}. */ public String getId() { return id; } /** - * Reads the document container + * Reads the current container. *

* After subscription the operation will be performed. The {@link Mono} upon - * successful completion will contain a single cosmos container response with + * successful completion will contain a single Cosmos container response with * the read container. In case of failure the {@link Mono} will error. * - * @return an {@link Mono} containing the single cosmos container response with + * @return an {@link Mono} containing the single Cosmos container response with * the read container or an error. */ public Mono read() { @@ -71,14 +71,14 @@ public Mono read() { } /** - * Reads the container + * Reads the current container while specifying additional options such as If-Match. *

* After subscription the operation will be performed. The {@link Mono} upon - * successful completion will contain a single cosmos container response with + * successful completion will contain a single Cosmos container response with * the read container. In case of failure the {@link Mono} will error. * - * @param options The cosmos container request options. - * @return an {@link Mono} containing the single cosmos container response with + * @param options the Cosmos container request options. + * @return an {@link Mono} containing the single Cosmos container response with * the read container or an error. */ public Mono read(CosmosContainerRequestOptions options) { @@ -93,11 +93,11 @@ public Mono read(CosmosContainerRequestOptions opt * Deletes the item container *

* After subscription the operation will be performed. The {@link Mono} upon - * successful completion will contain a single cosmos container response for the + * successful completion will contain a single Cosmos container response for the * deleted database. In case of failure the {@link Mono} will error. * * @param options the request options. - * @return an {@link Mono} containing the single cosmos container response for + * @return an {@link Mono} containing the single Cosmos container response for * the deleted database or an error. */ public Mono delete(CosmosContainerRequestOptions options) { @@ -109,13 +109,13 @@ public Mono delete(CosmosContainerRequestOptions o } /** - * Deletes the item container + * Deletes the current container. *

* After subscription the operation will be performed. The {@link Mono} upon - * successful completion will contain a single cosmos container response for the + * successful completion will contain a single Cosmos container response for the * deleted container. In case of failure the {@link Mono} will error. * - * @return an {@link Mono} containing the single cosmos container response for + * @return an {@link Mono} containing the single Cosmos container response for * the deleted container or an error. */ public Mono delete() { @@ -123,33 +123,33 @@ public Mono delete() { } /** - * Replaces a document container. + * Replaces the current container's properties. *

* After subscription the operation will be performed. The {@link Mono} upon - * successful completion will contain a single cosmos container response with - * the replaced document container. In case of failure the {@link Mono} will + * successful completion will contain a single Cosmos container response with + * the replaced container properties. In case of failure the {@link Mono} will * error. * * @param containerProperties the item container properties - * @return an {@link Mono} containing the single cosmos container response with - * the replaced document container or an error. + * @return an {@link Mono} containing the single Cosmos container response with + * the replaced container properties or an error. */ public Mono replace(CosmosContainerProperties containerProperties) { return replace(containerProperties, null); } /** - * Replaces a document container. + * Replaces the current container properties while using non-default request options. *

* After subscription the operation will be performed. The {@link Mono} upon - * successful completion will contain a single cosmos container response with - * the replaced document container. In case of failure the {@link Mono} will + * successful completion will contain a single Cosmos container response with + * the replaced container properties. In case of failure the {@link Mono} will * error. * * @param containerProperties the item container properties - * @param options the cosmos container request options. - * @return an {@link Mono} containing the single cosmos container response with - * the replaced document container or an error. + * @param options the Cosmos container request options. + * @return an {@link Mono} containing the single Cosmos container response with + * the replaced container properties or an error. */ public Mono replace( CosmosContainerProperties containerProperties, @@ -165,33 +165,33 @@ public Mono replace( /* CosmosAsyncItem operations */ /** - * Creates a cosmos item. + * Creates an item. *

* After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a single resource response with the - * created cosmos item. In case of failure the {@link Mono} will error. + * created Cosmos item. In case of failure the {@link Mono} will error. * - * @param the type parameter - * @param item the cosmos item represented as a POJO or cosmos item object. + * @param the type parameter. + * @param item the Cosmos item represented as a POJO or Cosmos item object. * @return an {@link Mono} containing the single resource response with the - * created cosmos item or an error. + * created Cosmos item or an error. */ public Mono> createItem(T item) { return createItem(item, new CosmosItemRequestOptions()); } /** - * Creates a cosmos item. + * Creates an item. *

* After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a single resource response with the - * created cosmos item. In case of failure the {@link Mono} will error. + * created Cosmos item. In case of failure the {@link Mono} will error. * - * @param the type parameter - * @param item the cosmos item represented as a POJO or cosmos item object. - * @param partitionKey the partition key + * @param the type parameter. + * @param item the Cosmos item represented as a POJO or Cosmos item object. + * @param partitionKey the partition key. * @param options the request options. - * @return an {@link Mono} containing the single resource response with the created cosmos item or an error. + * @return an {@link Mono} containing the single resource response with the created Cosmos item or an error. */ public Mono> createItem( T item, @@ -206,12 +206,12 @@ public Mono> createItem( /** - * Create an item. + * Creates a Cosmos item. * - * @param the type parameter - * @param item the item - * @param options the item request options - * @return an {@link Mono} containing the single resource response with the created cosmos item or an error. + * @param the type parameter. + * @param item the item. + * @param options the item request options. + * @return an {@link Mono} containing the single resource response with the created Cosmos item or an error. */ public Mono> createItem(T item, CosmosItemRequestOptions options) { if (options == null) { @@ -236,25 +236,25 @@ public Mono> createItem(T item, CosmosItemRequest * successful completion will contain a single resource response with the * upserted item. In case of failure the {@link Mono} will error. * - * @param the type parameter + * @param the type parameter. * @param item the item represented as a POJO or Item object to upsert. - * @return an {@link Mono} containing the single resource response with the upserted document or an error. + * @return an {@link Mono} containing the single resource response with the upserted item or an error. */ public Mono> upsertItem(T item) { return upsertItem(item, new CosmosItemRequestOptions()); } /** - * Upserts a cosmos item. + * Upserts an item. *

* After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a single resource response with the * upserted item. In case of failure the {@link Mono} will error. * - * @param the type parameter + * @param the type parameter. * @param item the item represented as a POJO or Item object to upsert. * @param options the request options. - * @return an {@link Mono} containing the single resource response with the upserted document or an error. + * @return an {@link Mono} containing the single resource response with the upserted item or an error. */ public Mono> upsertItem(T item, CosmosItemRequestOptions options) { if (options == null) { @@ -271,35 +271,35 @@ public Mono> upsertItem(T item, CosmosItemRequest } /** - * Reads all cosmos items in the container. + * Reads all the items in the current container. *

* After subscription the operation will be performed. The {@link CosmosPagedFlux} will - * contain one or several feed response of the read cosmos items. In case of + * contain one or several feed response of the read Cosmos items. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param the type parameter - * @param classType the class type - * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read cosmos items or an + * @param the type parameter. + * @param classType the class type. + * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read Cosmos items or an * error. */ - public CosmosPagedFlux readAllItems(Class classType) { + CosmosPagedFlux readAllItems(Class classType) { return readAllItems(new FeedOptions(), classType); } /** - * Reads all cosmos items in a container. + * Reads all the items in the current container. *

* After subscription the operation will be performed. The {@link CosmosPagedFlux} will - * contain one or several feed response of the read cosmos items. In case of + * contain one or several feed response of the read Cosmos items. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param the type parameter + * @param the type parameter. * @param options the feed options. - * @param classType the class type - * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read cosmos items or an + * @param classType the class type. + * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read Cosmos items or an * error. */ - public CosmosPagedFlux readAllItems(FeedOptions options, Class classType) { + CosmosPagedFlux readAllItems(FeedOptions options, Class classType) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper().readDocuments(getLink(), options).map( @@ -308,15 +308,15 @@ public CosmosPagedFlux readAllItems(FeedOptions options, Class classTy } /** - * Query for documents in a items in a container + * Query for items in the current container. *

* After subscription the operation will be performed. The {@link CosmosPagedFlux} will * contain one or several feed response of the obtained items. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param the type parameter + * @param the type parameter. * @param query the query. - * @param classType the class type + * @param classType the class type. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an * error. */ @@ -325,16 +325,16 @@ public CosmosPagedFlux queryItems(String query, Class classType) { } /** - * Query for documents in a items in a container + * Query for items in the current container using a string. *

* After subscription the operation will be performed. The {@link CosmosPagedFlux} will * contain one or several feed response of the obtained items. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param the type parameter + * @param the type parameter. * @param query the query. * @param options the feed options. - * @param classType the class type + * @param classType the class type. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an * error. */ @@ -343,15 +343,15 @@ public CosmosPagedFlux queryItems(String query, FeedOptions options, Clas } /** - * Query for documents in a items in a container + * Query for items in the current container using a {@link SqlQuerySpec}. *

* After subscription the operation will be performed. The {@link CosmosPagedFlux} will * contain one or several feed response of the obtained items. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param the type parameter + * @param the type parameter. * @param querySpec the SQL query specification. - * @param classType the class type + * @param classType the class type. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an * error. */ @@ -360,16 +360,16 @@ public CosmosPagedFlux queryItems(SqlQuerySpec querySpec, Class classT } /** - * Query for documents in a items in a container + * Query for items in the current container using a {@link SqlQuerySpec} and {@link FeedOptions}. *

* After subscription the operation will be performed. The {@link Flux} will * contain one or several feed response of the obtained items. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param the type parameter + * @param the type parameter. * @param querySpec the SQL query specification. * @param options the feed options. - * @param classType the class type + * @param classType the class type. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an * error. */ @@ -400,32 +400,30 @@ private FeedResponse prepareFeedResponse(FeedResponse response, * Reads an item. *

* After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a cosmos item response with the read item - * In case of failure the {@link Mono} will error. + * The {@link Mono} upon successful completion will contain an item response with the read item. * - * @param the type parameter - * @param itemId the item id - * @param partitionKey the partition key - * @param itemType the item type - * @return an {@link Mono} containing the cosmos item response with the read item or an error + * @param the type parameter. + * @param itemId the item id. + * @param partitionKey the partition key. + * @param itemType the item type. + * @return an {@link Mono} containing the Cosmos item response with the read item or an error. */ public Mono> readItem(String itemId, PartitionKey partitionKey, Class itemType) { return readItem(itemId, partitionKey, ModelBridgeInternal.createCosmosItemRequestOptions(partitionKey), itemType); } /** - * Reads an item. + * Reads an item using a configured {@link CosmosItemRequestOptions}. *

* After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a cosmos item response with the read item - * In case of failure the {@link Mono} will error. + * The {@link Mono} upon successful completion will contain a Cosmos item response with the read item. * - * @param the type parameter - * @param itemId the item id - * @param partitionKey the partition key - * @param options the request cosmosItemRequestOptions - * @param itemType the item type - * @return an {@link Mono} containing the cosmos item response with the read item or an error + * @param the type parameter. + * @param itemId the item id. + * @param partitionKey the partition key. + * @param options the request {@link CosmosItemRequestOptions}. + * @param itemType the item type. + * @return an {@link Mono} containing the Cosmos item response with the read item or an error. */ public Mono> readItem( String itemId, PartitionKey partitionKey, @@ -445,14 +443,13 @@ public Mono> readItem( * Replaces an item with the passed in item. *

* After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item. - * In case of failure the {@link Mono} will error. + * The {@link Mono} upon successful completion will contain a single Cosmos item response with the replaced item. * - * @param the type parameter - * @param item the item to replace (containing the document id). - * @param itemId the item id - * @param partitionKey the partition key - * @return an {@link Mono} containing the cosmos item resource response with the replaced item or an error. + * @param the type parameter. + * @param item the item to replace (containing the item id). + * @param itemId the item id. + * @param partitionKey the partition key. + * @return an {@link Mono} containing the Cosmos item resource response with the replaced item or an error. */ public Mono> replaceItem(T item, String itemId, PartitionKey partitionKey) { return replaceItem(item, itemId, partitionKey, new CosmosItemRequestOptions()); @@ -462,15 +459,14 @@ public Mono> replaceItem(T item, String itemId, P * Replaces an item with the passed in item. *

* After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item. - * In case of failure the {@link Mono} will error. + * The {@link Mono} upon successful completion will contain a single Cosmos item response with the replaced item. * - * @param the type parameter - * @param item the item to replace (containing the document id). - * @param itemId the item id - * @param partitionKey the partition key - * @param options the request comosItemRequestOptions - * @return an {@link Mono} containing the cosmos item resource response with the replaced item or an error. + * @param the type parameter. + * @param item the item to replace (containing the item id). + * @param itemId the item id. + * @param partitionKey the partition key. + * @param options the request comosItemRequestOptions. + * @return an {@link Mono} containing the Cosmos item resource response with the replaced item or an error. */ public Mono> replaceItem( T item, String itemId, PartitionKey partitionKey, @@ -490,15 +486,14 @@ public Mono> replaceItem( } /** - * Deletes the item. + * Deletes an item. *

* After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item. - * In case of failure the {@link Mono} will error. + * The {@link Mono} upon successful completion will contain a single Cosmos item response with the replaced item. * - * @param itemId the item id - * @param partitionKey the partition key - * @return an {@link Mono} containing the cosmos item resource response. + * @param itemId the item id. + * @param partitionKey the partition key. + * @return an {@link Mono} containing the Cosmos item resource response. */ public Mono> deleteItem(String itemId, PartitionKey partitionKey) { return deleteItem(itemId, partitionKey, new CosmosItemRequestOptions()); @@ -508,13 +503,12 @@ public Mono> deleteItem(String itemId, Partition * Deletes the item. *

* After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item. - * In case of failure the {@link Mono} will error. + * The {@link Mono} upon successful completion will contain a single Cosmos item response with the replaced item. * - * @param itemId id of the item - * @param partitionKey partitionKey of the item - * @param options the request options - * @return an {@link Mono} containing the cosmos item resource response. + * @param itemId id of the item. + * @param partitionKey partitionKey of the item. + * @param options the request options. + * @return an {@link Mono} containing the Cosmos item resource response. */ public Mono> deleteItem( String itemId, PartitionKey partitionKey, @@ -542,9 +536,11 @@ private String getItemLink(String itemId) { } /** - * Gets scripts. This can be used to perform various operations on cosmos scripts + * Gets a {@link CosmosAsyncScripts} using the current container as context. + *

+ * This can be further used to perform various operations on Cosmos scripts. * - * @return the {@link CosmosAsyncScripts} + * @return the {@link CosmosAsyncScripts}. */ public CosmosAsyncScripts getScripts() { if (this.scripts == null) { @@ -554,7 +550,7 @@ public CosmosAsyncScripts getScripts() { } /** - * Lists all the conflicts in the container + * Lists all the conflicts in the current container. * * @param options the feed options * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the @@ -571,9 +567,9 @@ public CosmosPagedFlux readAllConflicts(FeedOptions op } /** - * Queries all the conflicts in the container + * Queries all the conflicts in the current container. * - * @param query the query + * @param query the query. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the * obtained conflicts or an error. */ @@ -582,10 +578,10 @@ public CosmosPagedFlux queryConflicts(String query) { } /** - * Queries all the conflicts in the container + * Queries all the conflicts in the current container. * - * @param query the query - * @param options the feed options + * @param query the query. + * @param options the feed options. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the * obtained conflicts or an error. */ @@ -600,73 +596,20 @@ public CosmosPagedFlux queryConflicts(String query, Fe } /** - * Gets a CosmosAsyncConflict object without making a service call + * Gets a {@link CosmosAsyncConflict} object using current container for context. * - * @param id id of the cosmos conflict - * @return a cosmos conflict + * @param id the id of the Cosmos conflict. + * @return a Cosmos conflict. */ public CosmosAsyncConflict getConflict(String id) { return new CosmosAsyncConflict(id, this); } /** - * Gets the throughput of the container + * Replace the throughput provisioned for the current container. * - * @return a {@link Mono} containing throughput or an error. - */ - public Mono readProvisionedThroughput() { - return this.read() - .flatMap(cosmosContainerResponse -> - database.getDocClientWrapper() - .queryOffers("select * from c where c.offerResourceId = '" - + cosmosContainerResponse.getProperties() - .getResourceId() + "'", new FeedOptions()) - .single()) - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error( - BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the resource")); - } - return database.getDocClientWrapper() - .readOffer(offerFeedResponse.getResults().get(0).getSelfLink()) - .single(); - }).map(cosmosOfferResponse -> cosmosOfferResponse.getResource().getThroughput()); - } - - /** - * Sets throughput provisioned for a container in measurement of - * Requests-per-Unit in the Azure Cosmos service. - * - * @param requestUnitsPerSecond the cosmos container throughput, expressed in - * Request Units per second - * @return a {@link Mono} containing throughput or an error. - */ - public Mono replaceProvisionedThroughput(int requestUnitsPerSecond) { - return this.read() - .flatMap(cosmosContainerResponse -> - database.getDocClientWrapper() - .queryOffers("select * from c where c.offerResourceId = '" - + cosmosContainerResponse.getProperties() - .getResourceId() + "'", new FeedOptions()) - .single()) - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error( - BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the resource")); - } - Offer offer = offerFeedResponse.getResults().get(0); - offer.setThroughput(requestUnitsPerSecond); - return database.getDocClientWrapper().replaceOffer(offer).single(); - }).map(offerResourceResponse -> offerResourceResponse.getResource().getThroughput()); - } - - /** - * Replace the throughput . - * - * @param throughputProperties the throughput properties - * @return the mono containing throughput response + * @param throughputProperties the throughput properties. + * @return the mono containing throughput response. */ public Mono replaceThroughput(ThroughputProperties throughputProperties) { return this.read() @@ -695,9 +638,9 @@ public Mono replaceThroughput(ThroughputProperties throughpu } /** - * Read the throughput throughput . + * Read the throughput provisioned for the current container. * - * @return the mono containing throughput response + * @return the mono containing throughput response. */ public Mono readThroughput() { return this.read() @@ -725,9 +668,9 @@ public Mono readThroughput() { /** - * Gets the parent Database + * Gets the parent {@link CosmosAsyncDatabase} for the current container. * - * @return the {@link CosmosAsyncDatabase} + * @return the {@link CosmosAsyncDatabase}. */ public CosmosAsyncDatabase getDatabase() { return database; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java index d58196c99a18..ff45cc5b6817 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java @@ -45,9 +45,9 @@ public class CosmosAsyncDatabase { } /** - * Get the id of the CosmosAsyncDatabase + * Get the id of the CosmosAsyncDatabase. * - * @return the id of the CosmosAsyncDatabase + * @return the id of the CosmosAsyncDatabase. */ public String getId() { return id; @@ -93,7 +93,7 @@ public Mono read(CosmosDatabaseRequestOptions optio * successful completion will contain a cosmos database response with the * deleted database. In case of failure the {@link Mono} will error. * - * @return an {@link Mono} containing the single cosmos database response + * @return an {@link Mono} containing the single cosmos database response. */ public Mono delete() { return delete(new CosmosDatabaseRequestOptions()); @@ -106,8 +106,8 @@ public Mono delete() { * successful completion will contain a cosmos database response with the * deleted database. In case of failure the {@link Mono} will error. * - * @param options the request options - * @return an {@link Mono} containing the single cosmos database response + * @param options the request options. + * @return an {@link Mono} containing the single cosmos database response. */ public Mono delete(CosmosDatabaseRequestOptions options) { if (options == null) { @@ -129,7 +129,7 @@ public Mono delete(CosmosDatabaseRequestOptions opt * @param containerProperties the container properties. * @return a {@link Mono} containing the single cosmos container response with * the created container or an error. - * @throws IllegalArgumentException containerProperties cannot be null + * @throws IllegalArgumentException containerProperties cannot be null. */ public Mono createContainer(CosmosContainerProperties containerProperties) { return createContainer(containerProperties, new CosmosContainerRequestOptions()); @@ -143,35 +143,35 @@ public Mono createContainer(CosmosContainerPropert * created container. In case of failure the {@link Mono} will error. * * @param containerProperties the container properties. - * @param throughput the throughput for the container + * @param throughput the throughput for the container. * @return a {@link Mono} containing the single cosmos container response with * the created container or an error. - * @throws IllegalArgumentException thown if containerProerties are null + * @throws IllegalArgumentException thown if containerProerties are null. */ - public Mono createContainer( + Mono createContainer( CosmosContainerProperties containerProperties, int throughput) { if (containerProperties == null) { throw new IllegalArgumentException("containerProperties"); } CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); return createContainer(containerProperties, options); } /** * Creates a container. * - * @param containerProperties the container properties - * @param throughputProperties the throughput properties - * @param options the request options - * @return the mono + * @param containerProperties the container properties. + * @param throughputProperties the throughput properties. + * @param options the request options. + * @return the mono. */ public Mono createContainer( CosmosContainerProperties containerProperties, ThroughputProperties throughputProperties, CosmosContainerRequestOptions options){ - ModelBridgeInternal.setOfferProperties(options, throughputProperties); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); return createContainer(containerProperties, options); } @@ -183,10 +183,10 @@ public Mono createContainer( * created container. In case of failure the {@link Mono} will error. * * @param containerProperties the containerProperties. - * @param options the cosmos container request options + * @param options the cosmos container request options. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. - * @throws IllegalArgumentException containerProperties can not be null + * @throws IllegalArgumentException containerProperties can not be null. */ public Mono createContainer( CosmosContainerProperties containerProperties, @@ -211,20 +211,20 @@ public Mono createContainer( * created container. In case of failure the {@link Mono} will error. * * @param containerProperties the containerProperties. - * @param throughput the throughput for the container - * @param options the cosmos container request options + * @param throughput the throughput for the container. + * @param options the cosmos container request options. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. - * @throws IllegalArgumentException containerProperties cannot be null + * @throws IllegalArgumentException containerProperties cannot be null. */ - public Mono createContainer( + Mono createContainer( CosmosContainerProperties containerProperties, int throughput, CosmosContainerRequestOptions options) { if (options == null) { options = new CosmosContainerRequestOptions(); } - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); return createContainer(containerProperties, options); } @@ -235,8 +235,8 @@ public Mono createContainer( * successful completion will contain a cosmos container response with the * created container. In case of failure the {@link Mono} will error. * - * @param id the cosmos container id - * @param partitionKeyPath the partition key path + * @param id the cosmos container id. + * @param partitionKeyPath the partition key path. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ @@ -251,15 +251,15 @@ public Mono createContainer(String id, String part * successful completion will contain a cosmos container response with the * created container. In case of failure the {@link Mono} will error. * - * @param id the cosmos container id - * @param partitionKeyPath the partition key path - * @param throughput the throughput for the container + * @param id the cosmos container id. + * @param partitionKeyPath the partition key path. + * @param throughputProperties the throughput properties for the container. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ - public Mono createContainer(String id, String partitionKeyPath, int throughput) { + public Mono createContainer(String id, String partitionKeyPath, ThroughputProperties throughputProperties) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); return createContainer(new CosmosContainerProperties(id, partitionKeyPath), options); } @@ -284,21 +284,49 @@ public Mono createContainerIfNotExists( /** * Creates a document container if it does not exist on the service. *

+ * The throughput setting will only be used if the specified container + * does not exist and therefor a new container will be created. + * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the * created or existing container. In case of failure the {@link Mono} will * error. * - * @param containerProperties the container properties - * @param throughput the throughput for the container + * @param containerProperties the container properties. + * @param throughput the throughput for the container. * @return a {@link Mono} containing the cosmos container response with the * created or existing container or an error. */ - public Mono createContainerIfNotExists( + Mono createContainerIfNotExists( CosmosContainerProperties containerProperties, int throughput) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); + CosmosAsyncContainer container = getContainer(containerProperties.getId()); + return createContainerIfNotExistsInternal(containerProperties, container, options); + } + + /** + * Creates a document container if it does not exist on the service. + *

+ * The throughput properties will only be used if the specified container + * does not exist and therefor a new container will be created. + * + * After subscription the operation will be performed. The {@link Mono} upon + * successful completion will contain a cosmos container response with the + * created or existing container. In case of failure the {@link Mono} will + * error. + * + * @param containerProperties the container properties. + * @param throughputProperties the throughput properties for the container. + * @return a {@link Mono} containing the cosmos container response with the + * created or existing container or an error. + */ + public Mono createContainerIfNotExists( + CosmosContainerProperties containerProperties, + ThroughputProperties throughputProperties) { + CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); CosmosAsyncContainer container = getContainer(containerProperties.getId()); return createContainerIfNotExistsInternal(containerProperties, container, options); } @@ -310,8 +338,8 @@ public Mono createContainerIfNotExists( * successful completion will contain a cosmos container response with the * created container. In case of failure the {@link Mono} will error. * - * @param id the cosmos container id - * @param partitionKeyPath the partition key path + * @param id the cosmos container id. + * @param partitionKeyPath the partition key path. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ @@ -325,26 +353,55 @@ public Mono createContainerIfNotExists(String id, /** * Creates a document container if it does not exist on the service. *

+ * The throughput setting will only be used if the specified container + * does not exist and a new container will be created. + * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the * created container. In case of failure the {@link Mono} will error. * - * @param id the cosmos container id - * @param partitionKeyPath the partition key path - * @param throughput the throughput for the container + * @param id the cosmos container id. + * @param partitionKeyPath the partition key path. + * @param throughput the throughput for the container. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ - public Mono createContainerIfNotExists( + Mono createContainerIfNotExists( String id, String partitionKeyPath, int throughput) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setOfferThroughput(options, throughput); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); CosmosAsyncContainer container = getContainer(id); return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), container, options); } + /** + * Creates a document container if it does not exist on the service. + *

+ * The throughput properties will only be used if the specified container + * does not exist and therefor a new container will be created. + * + * After subscription the operation will be performed. The {@link Mono} upon + * successful completion will contain a cosmos container response with the + * created container. In case of failure the {@link Mono} will error. + * + * @param id the cosmos container id. + * @param partitionKeyPath the partition key path. + * @param throughputProperties the throughput properties for the container. + * @return a {@link Mono} containing the cosmos container response with the + * created container or an error. + */ + public Mono createContainerIfNotExists( + String id, String partitionKeyPath, + ThroughputProperties throughputProperties) { + CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); + CosmosAsyncContainer container = getContainer(id); + return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), container, + options); + } + private Mono createContainerIfNotExistsInternal( CosmosContainerProperties containerProperties, CosmosAsyncContainer container, CosmosContainerRequestOptions options) { @@ -549,7 +606,7 @@ public CosmosPagedFlux readAllUsers(FeedOptions options) { * contain one or several feed response of the obtained users. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param query query as string + * @param query query as string. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the * obtained users or an error. */ @@ -564,8 +621,8 @@ public CosmosPagedFlux queryUsers(String query) { * contain one or several feed response of the obtained users. In case of * failure the {@link CosmosPagedFlux} will error. * - * @param query query as string - * @param options the feed options + * @param query query as string. + * @param options the feed options. * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the * obtained users or an error. */ @@ -620,74 +677,12 @@ public CosmosAsyncUser getUser(String id) { return new CosmosAsyncUser(id, this); } - /** - * Gets the throughput of the database - * - * @return a {@link Mono} containing throughput or an error. - */ - public Mono readProvisionedThroughput() { - return this.read() - .flatMap(cosmosDatabaseResponse -> getDocClientWrapper() - .queryOffers("select * from c where c.offerResourceId = '" - + cosmosDatabaseResponse.getProperties() - .getResourceId() + "'", - new FeedOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the resource")); - } - return getDocClientWrapper() - .readOffer(offerFeedResponse.getResults() - .get(0) - .getSelfLink()) - .single(); - }).map(cosmosContainerResponse1 -> cosmosContainerResponse1 - .getResource() - .getThroughput())); - } - - /** - * Sets throughput provisioned for a container in measurement of - * Requests-per-Unit in the Azure Cosmos service. - * - * @param requestUnitsPerSecond the cosmos container throughput, expressed in - * Request Units per second - * @return a {@link Mono} containing throughput or an error. - */ - public Mono replaceProvisionedThroughput(int requestUnitsPerSecond) { - return this.read() - .flatMap(cosmosDatabaseResponse -> this.getDocClientWrapper() - .queryOffers("select * from c where c.offerResourceId = '" - + cosmosDatabaseResponse.getProperties() - .getResourceId() - + "'", new FeedOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the resource")); - } - Offer offer = offerFeedResponse.getResults().get(0); - offer.setThroughput(requestUnitsPerSecond); - return this.getDocClientWrapper().replaceOffer(offer) - .single(); - }).map(offerResourceResponse -> offerResourceResponse - .getResource() - .getThroughput())); - } - /** * Sets throughput provisioned for a container in measurement of * Requests-per-Unit in the Azure Cosmos service. * - * @param throughputProperties the throughput properties - * @return the mono + * @param throughputProperties the throughput properties. + * @return the mono. */ public Mono replaceThroughput(ThroughputProperties throughputProperties) { return this.read() @@ -717,9 +712,9 @@ public Mono replaceThroughput(ThroughputProperties throughpu } /** - * Gets the throughput of the database + * Gets the throughput of the database. * - * @return the mono containing throughput response + * @return the mono containing throughput response. */ public Mono readThroughput() { return this.read() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClient.java index 55fc2d4364ed..fd98ccdfb7ea 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClient.java @@ -11,6 +11,7 @@ import com.azure.cosmos.models.FeedOptions; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.CosmosPagedIterable; import com.azure.cosmos.util.UtilBridgeInternal; @@ -20,7 +21,7 @@ import java.io.Closeable; /** - * Provides a client-side logical representation of the Azure Cosmos database service. + * Provides a client-side logical representation of the Azure Cosmos DB service. * SyncClient is used to perform operations in a synchronous way */ @ServiceClient(builder = CosmosClientBuilder.class) @@ -32,9 +33,9 @@ public final class CosmosClient implements Closeable { } /** - * Create a Database if it does not already exist on the service + * Create a Cosmos database if it does not already exist on the service. * - * @param databaseProperties {@link CosmosDatabaseProperties} the database properties + * @param databaseProperties {@link CosmosDatabaseProperties} the database properties. * @return the {@link CosmosDatabaseResponse} with the created database. */ public CosmosDatabaseResponse createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) { @@ -42,16 +43,26 @@ public CosmosDatabaseResponse createDatabaseIfNotExists(CosmosDatabaseProperties } /** - * Create a Database if it does not already exist on the service + * Create a Database if it does not already exist on the service. * - * @param id the id of the database + * @param id the id of the database. + * @param throughputProperties the throughputProperties. + * @return the {@link CosmosDatabaseResponse} with the created database. + */ + public CosmosDatabaseResponse createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) { + return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabaseIfNotExists(id, throughputProperties)); + } + + /** + * Create a Database if it does not already exist on the service. + * + * @param id the id of the database. * @return the {@link CosmosDatabaseResponse} with the created database. */ public CosmosDatabaseResponse createDatabaseIfNotExists(String id) { return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabaseIfNotExists(id)); } - /** * Creates a database. * @@ -77,7 +88,7 @@ public CosmosDatabaseResponse createDatabase(CosmosDatabaseProperties databasePr /** * Creates a database. * - * @param id the id of the database + * @param id the id of the database. * @return the {@link CosmosDatabaseResponse} with the created database. */ public CosmosDatabaseResponse createDatabase(String id) { @@ -89,38 +100,37 @@ public CosmosDatabaseResponse createDatabase(String id) { * Creates a database. * * @param databaseProperties {@link CosmosDatabaseProperties} the database properties. - * @param throughput the throughput - * @param options {@link CosmosDatabaseRequestOptions} the request options + * @param throughputProperties the throughput properties. + * @param options {@link CosmosDatabaseRequestOptions} the request options. * @return the {@link CosmosDatabaseResponse} with the created database. */ public CosmosDatabaseResponse createDatabase(CosmosDatabaseProperties databaseProperties, - int throughput, + ThroughputProperties throughputProperties, CosmosDatabaseRequestOptions options) { - return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabase(databaseProperties, throughput, options)); + return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabase(databaseProperties, throughputProperties, options)); } /** * Creates a database. * * @param databaseProperties {@link CosmosDatabaseProperties} the database properties. - * @param throughput the throughput + * @param throughputProperties the throughput properties. * @return the {@link CosmosDatabaseResponse} with the created database. */ public CosmosDatabaseResponse createDatabase(CosmosDatabaseProperties databaseProperties, - int throughput) { - return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabase(databaseProperties, throughput)); + ThroughputProperties throughputProperties) { + return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabase(databaseProperties, throughputProperties)); } - /** * Creates a database. * - * @param id the id of the database - * @param throughput the throughput + * @param id the id of the database. + * @param throughputProperties the throughput properties. * @return the {@link CosmosDatabaseResponse} with the created database. */ - public CosmosDatabaseResponse createDatabase(String id, int throughput) { - return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabase(id, throughput)); + public CosmosDatabaseResponse createDatabase(String id, ThroughputProperties throughputProperties) { + return mapDatabaseResponseAndBlock(asyncClientWrapper.createDatabase(id, throughputProperties)); } CosmosDatabaseResponse mapDatabaseResponseAndBlock(Mono databaseMono) { @@ -158,9 +168,9 @@ public CosmosPagedIterable readAllDatabases() { } /** - * Query a database + * Query a database. * - * @param query the query + * @param query the query. * @param options {@link FeedOptions}the feed options. * @return the {@link CosmosPagedIterable} for feed response with the obtained databases. */ @@ -169,10 +179,10 @@ public CosmosPagedIterable queryDatabases(String query } /** - * Query a database + * Query a database. * - * @param querySpec {@link SqlQuerySpec} the query spec - * @param options the query + * @param querySpec {@link SqlQuerySpec} the query spec. + * @param options the query. * @return the {@link CosmosPagedIterable} for feed response with the obtained databases. */ public CosmosPagedIterable queryDatabases(SqlQuerySpec querySpec, @@ -181,10 +191,10 @@ public CosmosPagedIterable queryDatabases(SqlQuerySpec } /** - * Gets the database client + * Gets the database client. * - * @param id the id of the database - * @return {@link CosmosDatabase} the cosmos sync database + * @param id the id of the database. + * @return {@link CosmosDatabase} the cosmos sync database. */ public CosmosDatabase getDatabase(String id) { return new CosmosDatabase(id, this, asyncClientWrapper.getDatabase(id)); @@ -199,7 +209,7 @@ CosmosAsyncClient asyncClient() { } /** - * Close this {@link CosmosClient} instance + * Close this {@link CosmosClient} instance. */ public void close() { asyncClientWrapper.close(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java index 0a76b9027dba..0f60e18334f8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java @@ -35,9 +35,9 @@ public class CosmosContainer { /** * Instantiates a new Cosmos sync container. * - * @param id the id - * @param database the database - * @param container the container + * @param id the container id. + * @param database the database. + * @param container the container. */ CosmosContainer(String id, CosmosDatabase database, CosmosAsyncContainer container) { this.id = id; @@ -46,68 +46,68 @@ public class CosmosContainer { } /** - * Id string. + * Gets the current container id. * - * @return the string + * @return the container id. */ public String getId() { return id; } /** - * Read cosmos sync container response. + * Reads the current container. * - * @return the cosmos sync container response + * @return the Cosmos container response with the read container. */ public CosmosContainerResponse read() { return database.mapContainerResponseAndBlock(this.asyncContainer.read()); } /** - * Read cosmos sync container response. + * Reads the current container while specifying additional options such as If-Match. * - * @param options the options - * @return the cosmos sync container response + * @param options the options. + * @return the Cosmos sync container response. */ public CosmosContainerResponse read(CosmosContainerRequestOptions options) { return database.mapContainerResponseAndBlock(this.asyncContainer.read(options)); } /** - * Delete cosmos sync container response. + * Deletes the current cosmos sync container response while specifying additional options such as If-Match. * - * @param options the options - * @return the cosmos sync container response + * @param options the options. + * @return the cosmos sync container response. */ public CosmosContainerResponse delete(CosmosContainerRequestOptions options) { return database.mapContainerResponseAndBlock(this.asyncContainer.delete(options)); } /** - * Delete cosmos sync container response. + * Deletes the current cosmos sync container response. * - * @return the cosmos sync container response + * @return the cosmos sync container response. */ public CosmosContainerResponse delete() { return database.mapContainerResponseAndBlock(this.asyncContainer.delete()); } /** - * Replace cosmos sync container response. + * Replaces the current container properties. * - * @param containerProperties the container properties - * @return the cosmos sync container response + * @param containerProperties the container properties. + * @return the cosmos sync container response. */ public CosmosContainerResponse replace(CosmosContainerProperties containerProperties) { return database.mapContainerResponseAndBlock(this.asyncContainer.replace(containerProperties)); } /** - * Replace cosmos sync container response. + * Replaces the current container properties while specifying additional options such as If-Match. * - * @param containerProperties the container properties - * @param options the options - * @return the cosmos sync container response + * @param containerProperties the container properties. + * @param options the options. + * @return the cosmos sync container response. */ public CosmosContainerResponse replace(CosmosContainerProperties containerProperties, CosmosContainerRequestOptions options) { @@ -115,65 +115,46 @@ public CosmosContainerResponse replace(CosmosContainerProperties containerProper } /** - * Read provisioned throughput integer. + * Sets the throughput for the current container. * - * @return the integer. null response indicates database doesn't have any provisioned RUs - */ - public Integer readProvisionedThroughput() { - return database.throughputResponseToBlock(this.asyncContainer.readProvisionedThroughput()); - } - - /** - * Replace provisioned throughput integer. - * - * @param requestUnitsPerSecond the request units per second - * @return the integer - */ - public Integer replaceProvisionedThroughput(int requestUnitsPerSecond) { - return database.throughputResponseToBlock(this.asyncContainer - .replaceProvisionedThroughput(requestUnitsPerSecond)); - } - - /** - * Sets the throughput. - * - * @param throughputProperties the throughput properties - * @return the throughput response + * @param throughputProperties the throughput properties. + * @return the throughput response. */ public ThroughputResponse replaceThroughput(ThroughputProperties throughputProperties) { return database.throughputResponseToBlock(this.asyncContainer.replaceThroughput(throughputProperties)); } /** - * Gets the throughput. + * Gets the throughput for the current container. * - * @return the throughput response + * @return the throughput response. */ public ThroughputResponse readThroughput() { return database.throughputResponseToBlock(this.asyncContainer.readThroughput()); } - /* CosmosAsyncItem operations */ + /* CosmosItem operations */ /** - * Create item cosmos sync item response. + * Creates a new item synchronously and returns its respective Cosmos item response. * * @param the type parameter * @param item the item - * @return the cosmos sync item response + * @return the Cosmos sync item response */ public CosmosItemResponse createItem(T item) { return this.mapItemResponseAndBlock(this.asyncContainer.createItem(item)); } /** - * Create a cosmos item synchronously. + * Creates a new item synchronously and returns its respective Cosmos item response + * while specifying additional options. * - * @param the type parameter - * @param item the item - * @param partitionKey the partition key - * @param options the options - * @return the cosmos sync item response + * @param the type parameter. + * @param item the item. + * @param partitionKey the partition key. + * @param options the options. + * @return the Cosmos sync item response. */ public CosmosItemResponse createItem(T item, PartitionKey partitionKey, @@ -182,12 +163,15 @@ public CosmosItemResponse createItem(T item, } /** - * Create a cosmos item. + * Creates a new item synchronously and returns its respective Cosmos item response + * while specifying additional options. + *

+ * The partition key value will be automatically extracted from the item's content. * - * @param the type parameter - * @param item the item - * @param options the options - * @return the cosmos item response + * @param the type parameter. + * @param item the item. + * @param options the options. + * @return the cosmos item response. */ public CosmosItemResponse createItem(T item, CosmosItemRequestOptions options) { @@ -195,23 +179,23 @@ public CosmosItemResponse createItem(T item, CosmosItemRequestOptions opt } /** - * Upsert item cosmos sync item response. + * Upserts an Cosmos item in the current container. * - * @param the type parameter - * @param item the item - * @return the cosmos sync item response + * @param the type parameter. + * @param item the item. + * @return the Cosmos sync item response. */ public CosmosItemResponse upsertItem(T item) { return this.mapItemResponseAndBlock(this.asyncContainer.upsertItem(item)); } /** - * Upsert item cosmos sync item response. + * Upserts a item Cosmos sync item while specifying additional options. * - * @param the type parameter - * @param item the item - * @param options the options - * @return the cosmos sync item response + * @param the type parameter. + * @param item the item. + * @param options the options. + * @return the Cosmos sync item response. */ @SuppressWarnings("unchecked") // Note: @kushagraThapar and @moderakh to ensure this casting is valid @@ -220,10 +204,10 @@ public CosmosItemResponse upsertItem(Object item, CosmosItemRequestOption } /** - * Map item response and block cosmos sync item response. + * Maps item response and block cosmos sync item response. * - * @param itemMono the item mono - * @return the cosmos sync item response + * @param itemMono the item mono. + * @return the cosmos sync item response. */ CosmosItemResponse mapItemResponseAndBlock(Mono> itemMono) { try { @@ -256,51 +240,51 @@ private CosmosItemResponse mapDeleteItemResponseAndBlock(Mono the type parameter - * @param options the options - * @param classType the classType - * @return the {@link CosmosPagedIterable} + * @param the type parameter. + * @param options the options. + * @param classType the classType. + * @return the {@link CosmosPagedIterable}. */ - public CosmosPagedIterable readAllItems(FeedOptions options, Class classType) { + CosmosPagedIterable readAllItems(FeedOptions options, Class classType) { return getCosmosPagedIterable(this.asyncContainer.readAllItems(options, classType)); } /** - * Query items {@link CosmosPagedIterable}. + * Query items in the current container returning the results as {@link CosmosPagedIterable}. * - * @param the type parameter - * @param query the query - * @param options the options - * @param classType the class type - * @return the {@link CosmosPagedIterable} + * @param the type parameter. + * @param query the query. + * @param options the options. + * @param classType the class type. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryItems(String query, FeedOptions options, Class classType) { return getCosmosPagedIterable(this.asyncContainer.queryItems(query, options, classType)); } /** - * Query items {@link CosmosPagedIterable}. + * Query items in the current container returning the results as {@link CosmosPagedIterable}. * - * @param the type parameter - * @param querySpec the query spec - * @param options the options - * @param classType the class type - * @return the {@link CosmosPagedIterable} + * @param the type parameter. + * @param querySpec the query spec. + * @param options the options. + * @param classType the class type. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryItems(SqlQuerySpec querySpec, FeedOptions options, Class classType) { return getCosmosPagedIterable(this.asyncContainer.queryItems(querySpec, options, classType)); } /** - * Read cosmos sync item response. + * Reads an item in the current container. * - * @param the type parameter - * @param itemId the item id - * @param partitionKey the partition key - * @param itemType the class type of item - * @return the cosmos sync item response + * @param the type parameter. + * @param itemId the item id. + * @param partitionKey the partition key. + * @param itemType the class type of item. + * @return the Cosmos sync item response. */ public CosmosItemResponse readItem(String itemId, PartitionKey partitionKey, Class itemType) { return this.mapItemResponseAndBlock(asyncContainer.readItem(itemId, @@ -310,14 +294,14 @@ public CosmosItemResponse readItem(String itemId, PartitionKey partitionK } /** - * Read cosmos sync item response. + * Reads an item in the current container while specifying additional options. * - * @param the type parameter - * @param itemId the item id - * @param partitionKey the partition key - * @param options the options - * @param itemType the class type of item - * @return the cosmos sync item response + * @param the type parameter. + * @param itemId the item id. + * @param partitionKey the partition key. + * @param options the options. + * @param itemType the class type of item. + * @return the Cosmos sync item response. */ public CosmosItemResponse readItem( String itemId, PartitionKey partitionKey, @@ -326,14 +310,14 @@ public CosmosItemResponse readItem( } /** - * Replace cosmos sync item response. + * Replaces an item in the current container. * - * @param the type parameter - * @param item the item - * @param itemId the item id - * @param partitionKey the partition key - * @param options the options - * @return the cosmos sync item response + * @param the type parameter. + * @param item the item. + * @param itemId the item id. + * @param partitionKey the partition key. + * @param options the options. + * @return the Cosmos sync item response. */ public CosmosItemResponse replaceItem(T item, String itemId, @@ -343,12 +327,12 @@ public CosmosItemResponse replaceItem(T item, } /** - * Delete cosmos sync item response. + * Deletes an item in the current container. * - * @param itemId the item id - * @param partitionKey the partition key - * @param options the options - * @return the cosmos sync item response + * @param itemId the item id. + * @param partitionKey the partition key. + * @param options the options. + * @return the Cosmos sync item response. */ public CosmosItemResponse deleteItem(String itemId, PartitionKey partitionKey, CosmosItemRequestOptions options) { @@ -356,9 +340,9 @@ public CosmosItemResponse deleteItem(String itemId, PartitionKey partiti } /** - * Gets the cosmos sync scripts. + * Gets the cosmos scripts using the current container as context. * - * @return the cosmos sync scripts + * @return the cosmos sync scripts. */ public CosmosScripts getScripts() { if (this.scripts == null) { @@ -370,10 +354,10 @@ public CosmosScripts getScripts() { // TODO: should make partitionkey public in CosmosAsyncItem and fix the below call /** - * Convert response cosmos sync item response. + * Convert a {@link CosmosAsyncItemResponse} to a Cosmos sync item response. * - * @param response the cosmos item response - * @return the cosmos sync item response + * @param response the cosmos item response. + * @return the cosmos sync item response. */ private CosmosItemResponse convertResponse(CosmosAsyncItemResponse response) { return ModelBridgeInternal.createCosmosItemResponse(response); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDatabase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDatabase.java index 08dd1f6af8c8..88a2be0082df 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDatabase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDatabase.java @@ -36,9 +36,9 @@ public class CosmosDatabase { /** * Instantiates a new Cosmos sync database. * - * @param id the id - * @param client the client - * @param database the database + * @param id the id. + * @param client the client. + * @param database the database. */ CosmosDatabase(String id, CosmosClient client, CosmosAsyncDatabase database) { this.id = id; @@ -49,16 +49,16 @@ public class CosmosDatabase { /** * Get the id of the CosmosAsyncDatabase * - * @return the id of the database + * @return the id of the database. */ public String getId() { return id; } /** - * Reads a database + * Reads a database. * - * @return the {@link CosmosDatabaseResponse} + * @return the {@link CosmosDatabaseResponse}. */ public CosmosDatabaseResponse read() { return client.mapDatabaseResponseAndBlock((databaseWrapper.read())); @@ -77,7 +77,7 @@ public CosmosDatabaseResponse read(CosmosDatabaseRequestOptions options) { /** * Delete a database. * - * @return the {@link CosmosDatabaseResponse} + * @return the {@link CosmosDatabaseResponse}. */ public CosmosDatabaseResponse delete() { return client.mapDatabaseResponseAndBlock(databaseWrapper.delete()); @@ -87,7 +87,7 @@ public CosmosDatabaseResponse delete() { * Delete a database. * * @param options the {@link CosmosDatabaseRequestOptions} request options. - * @return the {@link CosmosDatabaseResponse} + * @return the {@link CosmosDatabaseResponse}. */ public CosmosDatabaseResponse delete(CosmosDatabaseRequestOptions options) { return client.mapDatabaseResponseAndBlock(databaseWrapper.delete(options)); @@ -98,7 +98,7 @@ public CosmosDatabaseResponse delete(CosmosDatabaseRequestOptions options) { /** * Creates a cosmos container. * - * @param containerProperties the {@link CosmosContainerProperties} + * @param containerProperties the {@link CosmosContainerProperties}. * @return the {@link CosmosContainerResponse} with the created container. */ public CosmosContainerResponse createContainer(CosmosContainerProperties containerProperties) { @@ -108,8 +108,8 @@ public CosmosContainerResponse createContainer(CosmosContainerProperties contain /** * Creates a cosmos container. * - * @param containerProperties the {@link CosmosContainerProperties} - * @param throughput the throughput + * @param containerProperties the {@link CosmosContainerProperties}. + * @param throughput the throughput. * @return the {@link CosmosContainerResponse} with the created container. */ public CosmosContainerResponse createContainer( @@ -121,8 +121,8 @@ public CosmosContainerResponse createContainer( /** * Creates a cosmos container. * - * @param containerProperties the {@link CosmosContainerProperties} - * @param options the {@link CosmosContainerProperties} + * @param containerProperties the {@link CosmosContainerProperties}. + * @param options the {@link CosmosContainerProperties}. * @return the {@link CosmosContainerResponse} with the created container. */ public CosmosContainerResponse createContainer( @@ -134,9 +134,9 @@ public CosmosContainerResponse createContainer( /** * Creates a cosmos container. * - * @param containerProperties the {@link CosmosContainerProperties} - * @param throughput the throughput - * @param options the {@link CosmosContainerProperties} + * @param containerProperties the {@link CosmosContainerProperties}. + * @param throughput the throughput. + * @param options the {@link CosmosContainerProperties}. * @return the {@link CosmosContainerResponse} with the created container. */ public CosmosContainerResponse createContainer( @@ -151,10 +151,10 @@ public CosmosContainerResponse createContainer( /** * Creates a cosmos container. * - * @param containerProperties the container properties - * @param throughputProperties the throughput properties - * @param options the options - * @return the cosmos container response + * @param containerProperties the container properties. + * @param throughputProperties the throughput properties. + * @param options the options. + * @return the cosmos container response. */ public CosmosContainerResponse createContainer( CosmosContainerProperties containerProperties, @@ -168,9 +168,9 @@ public CosmosContainerResponse createContainer( /** * Create container cosmos sync container response. * - * @param id the id - * @param partitionKeyPath the partition key path - * @return the cosmos sync container response + * @param id the id. + * @param partitionKeyPath the partition key path. + * @return the cosmos sync container response. */ public CosmosContainerResponse createContainer(String id, String partitionKeyPath) { return this.mapContainerResponseAndBlock(databaseWrapper.createContainer(id, partitionKeyPath)); @@ -179,20 +179,20 @@ public CosmosContainerResponse createContainer(String id, String partitionKeyPat /** * Create container cosmos sync container response. * - * @param id the id - * @param partitionKeyPath the partition key path - * @param throughput the throughput - * @return the cosmos sync container response + * @param id the id. + * @param partitionKeyPath the partition key path. + * @param throughputProperties the throughput properties. + * @return the cosmos sync container response. */ - public CosmosContainerResponse createContainer(String id, String partitionKeyPath, int throughput) { - return this.mapContainerResponseAndBlock(databaseWrapper.createContainer(id, partitionKeyPath, throughput)); + public CosmosContainerResponse createContainer(String id, String partitionKeyPath, ThroughputProperties throughputProperties) { + return this.mapContainerResponseAndBlock(databaseWrapper.createContainer(id, partitionKeyPath, throughputProperties)); } /** * Create container if not exists cosmos sync container response. * - * @param containerProperties the container properties - * @return the cosmos sync container response + * @param containerProperties the container properties. + * @return the cosmos sync container response. */ public CosmosContainerResponse createContainerIfNotExists(CosmosContainerProperties containerProperties) { return this.mapContainerResponseAndBlock(databaseWrapper.createContainerIfNotExists(containerProperties)); @@ -201,23 +201,40 @@ public CosmosContainerResponse createContainerIfNotExists(CosmosContainerPropert /** * Create container if not exists cosmos sync container response. * - * @param containerProperties the container properties - * @param throughput the throughput - * @return the cosmos sync container response + * @param containerProperties the container properties. + * @param throughput the throughput. + * @return the cosmos sync container response. */ public CosmosContainerResponse createContainerIfNotExists( CosmosContainerProperties containerProperties, int throughput) { return this.mapContainerResponseAndBlock(databaseWrapper.createContainerIfNotExists(containerProperties, - throughput)); + throughput)); + } + + /** + * Create container if not exists cosmos sync container response. + *

+ * The throughput properties will only be used if the specified container + * does not exist and therefor a new container will be created. + * + * @param containerProperties the container properties. + * @param throughputProperties the throughput properties for the container. + * @return the cosmos sync container response. + */ + public CosmosContainerResponse createContainerIfNotExists( + CosmosContainerProperties containerProperties, + ThroughputProperties throughputProperties) { + return this.mapContainerResponseAndBlock(databaseWrapper.createContainerIfNotExists(containerProperties, + throughputProperties)); } /** * Create container if not exists cosmos sync container response. * - * @param id the id - * @param partitionKeyPath the partition key path - * @return the cosmos sync container response + * @param id the id. + * @param partitionKeyPath the partition key path. + * @return the cosmos sync container response. */ public CosmosContainerResponse createContainerIfNotExists( String id, @@ -227,25 +244,47 @@ public CosmosContainerResponse createContainerIfNotExists( /** * Create container if not exists cosmos sync container response. + *

+ * The throughput settings will only be used if the specified container + * does not exist and therefor a new container will be created. * - * @param id the id - * @param partitionKeyPath the partition key path - * @param throughput the throughput - * @return the cosmos sync container response + * @param id the id. + * @param partitionKeyPath the partition key path. + * @param throughput the throughput. + * @return the cosmos sync container response. */ public CosmosContainerResponse createContainerIfNotExists( String id, String partitionKeyPath, int throughput) { return this.mapContainerResponseAndBlock(databaseWrapper.createContainerIfNotExists(id, - partitionKeyPath, - throughput)); + partitionKeyPath, + throughput)); + } + + /** + * Create container if not exists cosmos sync container response. + *

+ * The throughput properties will only be used if the specified container + * does not exist and therefor a new container will be created. + * + * @param id the id. + * @param partitionKeyPath the partition key path. + * @param throughputProperties the throughput properties for the container. + * @return the cosmos sync container response. + */ + public CosmosContainerResponse createContainerIfNotExists( + String id, String partitionKeyPath, + ThroughputProperties throughputProperties) { + return this.mapContainerResponseAndBlock(databaseWrapper.createContainerIfNotExists(id, + partitionKeyPath, + throughputProperties)); } /** * Map container response and block cosmos sync container response. * - * @param containerMono the container mono - * @return the cosmos sync container response + * @param containerMono the container mono. + * @return the cosmos sync container response. */ CosmosContainerResponse mapContainerResponseAndBlock(Mono containerMono) { try { @@ -265,8 +304,8 @@ CosmosContainerResponse mapContainerResponseAndBlock(Mono readAllContainers(FeedOptions options) { return getCosmosPagedIterable(databaseWrapper.readAllContainers(options)); @@ -275,7 +314,7 @@ public CosmosPagedIterable readAllContainers(FeedOpti /** * Read all containers iterator. * - @return the {@link CosmosPagedIterable} + @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable readAllContainers() { return getCosmosPagedIterable(databaseWrapper.readAllContainers()); @@ -284,8 +323,8 @@ public CosmosPagedIterable readAllContainers() { /** * Query containers iterator. * - * @param query the query - * @return the {@link CosmosPagedIterable} + * @param query the query. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryContainers(String query) { return getCosmosPagedIterable(databaseWrapper.queryContainers(query)); @@ -294,9 +333,9 @@ public CosmosPagedIterable queryContainers(String que /** * Query containers iterator. * - * @param query the query - * @param options the options - * @return the {@link CosmosPagedIterable} + * @param query the query. + * @param options the options. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryContainers(String query, FeedOptions options) { return getCosmosPagedIterable(databaseWrapper.queryContainers(query, options)); @@ -305,8 +344,8 @@ public CosmosPagedIterable queryContainers(String que /** * Query containers iterator. * - * @param querySpec the query spec - * @return the {@link CosmosPagedIterable} + * @param querySpec the query spec. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryContainers(SqlQuerySpec querySpec) { return getCosmosPagedIterable(databaseWrapper.queryContainers(querySpec)); @@ -315,9 +354,9 @@ public CosmosPagedIterable queryContainers(SqlQuerySp /** * Query containers iterator. * - * @param querySpec the query spec - * @param options the options - * @return the {@link CosmosPagedIterable} + * @param querySpec the query spec. + * @param options the options. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryContainers( SqlQuerySpec querySpec, @@ -326,10 +365,10 @@ public CosmosPagedIterable queryContainers( } /** - * Gets a CosmosContainer object without making a service call + * Gets a CosmosContainer object without making a service call. * - * @param id id of the container - * @return Cosmos Container + * @param id id of the container. + * @return Cosmos Container. */ public CosmosContainer getContainer(String id) { return new CosmosContainer(id, this, databaseWrapper.getContainer(id)); @@ -338,8 +377,8 @@ public CosmosContainer getContainer(String id) { /** * Convert response cosmos sync container response. * - * @param response the response - * @return the cosmos sync container response + * @param response the response. + * @return the cosmos sync container response. */ CosmosContainerResponse convertResponse(CosmosAsyncContainerResponse response) { return ModelBridgeInternal.createCosmosContainerResponse(response, this, client); @@ -350,8 +389,8 @@ CosmosContainerResponse convertResponse(CosmosAsyncContainerResponse response) { /** * Create user cosmos sync user response. * - * @param userProperties the settings - * @return the cosmos sync user response + * @param userProperties the settings. + * @return the cosmos sync user response. */ public CosmosUserResponse createUser(CosmosUserProperties userProperties) { return mapUserResponseAndBlock(databaseWrapper.createUser(userProperties)); @@ -360,8 +399,8 @@ public CosmosUserResponse createUser(CosmosUserProperties userProperties) { /** * Upsert user cosmos sync user response. * - * @param userProperties the settings - * @return the cosmos sync user response + * @param userProperties the settings. + * @return the cosmos sync user response. */ public CosmosUserResponse upsertUser(CosmosUserProperties userProperties) { return mapUserResponseAndBlock(databaseWrapper.upsertUser(userProperties)); @@ -370,7 +409,7 @@ public CosmosUserResponse upsertUser(CosmosUserProperties userProperties) { /** * Read all users {@link CosmosPagedIterable}. * - * @return the {@link CosmosPagedIterable} + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable readAllUsers() { return getCosmosPagedIterable(databaseWrapper.readAllUsers()); @@ -379,8 +418,8 @@ public CosmosPagedIterable readAllUsers() { /** * Read all users {@link CosmosPagedIterable}. * - * @param options the options - * @return the {@link CosmosPagedIterable} + * @param options the options. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable readAllUsers(FeedOptions options) { return getCosmosPagedIterable(databaseWrapper.readAllUsers(options)); @@ -389,8 +428,8 @@ public CosmosPagedIterable readAllUsers(FeedOptions option /** * Query users {@link CosmosPagedIterable}. * - * @param query the query - * @return the {@link CosmosPagedIterable} + * @param query the query. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryUsers(String query) { return getCosmosPagedIterable(databaseWrapper.queryUsers(query)); @@ -399,9 +438,9 @@ public CosmosPagedIterable queryUsers(String query) { /** * Query users {@link CosmosPagedIterable}. * - * @param query the query - * @param options the options - * @return the {@link CosmosPagedIterable} + * @param query the query. + * @param options the options. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryUsers(String query, FeedOptions options) { return getCosmosPagedIterable(databaseWrapper.queryUsers(query, options)); @@ -410,8 +449,8 @@ public CosmosPagedIterable queryUsers(String query, FeedOp /** * Query users {@link CosmosPagedIterable}. * - * @param querySpec the query spec - * @return the {@link CosmosPagedIterable} + * @param querySpec the query spec. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryUsers(SqlQuerySpec querySpec) { return getCosmosPagedIterable(databaseWrapper.queryUsers(querySpec)); @@ -420,9 +459,9 @@ public CosmosPagedIterable queryUsers(SqlQuerySpec querySp /** * Query users {@link CosmosPagedIterable}. * - * @param querySpec the query spec - * @param options the options - * @return the {@link CosmosPagedIterable} + * @param querySpec the query spec. + * @param options the options. + * @return the {@link CosmosPagedIterable}. */ public CosmosPagedIterable queryUsers(SqlQuerySpec querySpec, FeedOptions options) { return getCosmosPagedIterable(databaseWrapper.queryUsers(querySpec, options)); @@ -431,8 +470,8 @@ public CosmosPagedIterable queryUsers(SqlQuerySpec querySp /** * Gets user. * - * @param id the id - * @return the user + * @param id the id. + * @return the user. */ public CosmosUser getUser(String id) { return new CosmosUser(databaseWrapper.getUser(id), this, id); @@ -455,39 +494,20 @@ private CosmosUserResponse convertUserResponse(CosmosAsyncUserResponse response) return ModelBridgeInternal.createCosmosUserResponse(response, this); } - /** - * Read provisioned throughput integer. - * - * @return the integer. null response indicates database doesn't have any provisioned RUs - */ - public Integer readProvisionedThroughput() { - return throughputResponseToBlock(databaseWrapper.readProvisionedThroughput()); - } - - /** - * Replace provisioned throughput integer. - * - * @param requestUnitsPerSecond the request units per second - * @return the integer - */ - public Integer replaceProvisionedThroughput(int requestUnitsPerSecond) { - return throughputResponseToBlock(databaseWrapper.replaceProvisionedThroughput(requestUnitsPerSecond)); - } - /** * Sets the throughput. * - * @param throughputProperties the throughput properties - * @return the throughput response + * @param throughputProperties the throughput properties. + * @return the throughput response. */ public ThroughputResponse replaceThroughput(ThroughputProperties throughputProperties) { return throughputResponseToBlock(databaseWrapper.replaceThroughput(throughputProperties)); } /** - * Gets the throughput of the database + * Gets the throughput of the database. * - * @return the throughput response + * @return the throughput response. */ public ThroughputResponse readThroughput() { return throughputResponseToBlock(databaseWrapper.readThroughput()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java index a491fa9319b7..d4154305eb64 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java @@ -14,7 +14,7 @@ import java.util.List; /** - * Represents the Connection policy associated with a DocumentClient in the Azure Cosmos DB database service. + * Represents the Connection policy associated with a Cosmos client in the Azure Cosmos DB service. */ public final class ConnectionPolicy { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedProcessorBuilderImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedProcessorBuilderImpl.java index 4d01796a6a31..955623feadf8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedProcessorBuilderImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedProcessorBuilderImpl.java @@ -3,6 +3,10 @@ package com.azure.cosmos.implementation.changefeed.implementation; import com.azure.cosmos.ChangeFeedProcessor; +import com.azure.cosmos.implementation.ChangeFeedOptions; +import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; +import com.azure.cosmos.implementation.guava25.collect.Streams; import com.azure.cosmos.models.ChangeFeedProcessorOptions; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.implementation.changefeed.Bootstrapper; @@ -24,10 +28,13 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; import java.net.URI; import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; /** @@ -117,6 +124,86 @@ public boolean isStarted() { return this.partitionManager != null && this.partitionManager.isRunning(); } + /** + * Returns the current owner (host) and an approximation of the difference between the last processed item (defined + * by the state of the feed container) and the latest change in the container for each partition (lease + * document). + *

+ * An empty map will be returned if the processor was not started or no lease documents matching the current + * {@link ChangeFeedProcessor} instance's lease prefix could be found. + * + * @return a map representing the current owner and lease token, the current LSN and latest LSN, and the estimated + * lag, asynchronously. + */ + @Override + public Mono> getEstimatedLag() { + Map earlyResult = new ConcurrentHashMap<>(); + + if (this.leaseStoreManager == null || this.feedContextClient == null) { + return Mono.just(earlyResult); + } + + return this.leaseStoreManager.getAllLeases() + .flatMap(lease -> { + ChangeFeedOptions options = new ChangeFeedOptions() + .setMaxItemCount(1) + .setPartitionKeyRangeId(lease.getLeaseToken()) + .setStartFromBeginning(true) + .setRequestContinuation(lease.getContinuationToken()); + + return this.feedContextClient.createDocumentChangeFeedQuery(this.feedContextClient.getContainerClient(), options) + .take(1) + .map(feedResponse -> { + final String pkRangeIdSeparator = ":"; + final String segmentSeparator = "#"; + final String lsnPropertyName = "_lsn"; + String ownerValue = lease.getOwner(); + String sessionTokenLsn = feedResponse.getSessionToken(); + String parsedSessionToken = sessionTokenLsn.substring(sessionTokenLsn.indexOf(pkRangeIdSeparator)); + String[] segments = parsedSessionToken.split(segmentSeparator); + String latestLsn = segments[0]; + + if (segments.length >= 2) { + // default to Global LSN + latestLsn = segments[1]; + } + + if (ownerValue == null) { + ownerValue = ""; + } + + // An empty list of documents returned means that we are current (zero lag) + if (feedResponse.getResults() == null || feedResponse.getResults().size() == 0) { + return Pair.of(ownerValue + "_" + lease.getLeaseToken(), 0); + } + + Integer currentLsn = Integer.valueOf(feedResponse.getResults().get(0).get(lsnPropertyName).asText("0")); + Integer estimatedLag = Integer.valueOf(latestLsn); + estimatedLag = estimatedLag - currentLsn + 1; + + return Pair.of(ownerValue + "_" + lease.getLeaseToken() + "_" + currentLsn + "_" + latestLsn, estimatedLag); + }); + }) + .collectList() + .map(valueList -> { + Map result = new ConcurrentHashMap<>(); + for (Pair pair : valueList) { + result.put(pair.getKey(), pair.getValue()); + } + return result; + }); + +// this.options = new ChangeFeedOptions(); +// this.options.setMaxItemCount(settings.getMaxItemCount()); +// this.options.setPartitionKeyRangeId(settings.getPartitionKeyRangeId()); +// // this.setOptions.getSessionToken(getProperties.getSessionToken()); +// this.options.setStartFromBeginning(settings.isStartFromBeginning()); +// this.options.setRequestContinuation(settings.getStartContinuation()); +// this.options.setStartDateTime(settings.getStartTime()); +// .flatMap(value -> this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), +// this.options) + } + /** * Sets the host name. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java index 17f492b39c20..902e48b036af 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java @@ -28,7 +28,6 @@ public final class ChangeFeedProcessorOptions { private boolean startFromBeginning; private int minScaleCount; private int maxScaleCount; - private boolean discardExistingLeases; /** * Instantiates a new Change feed processor options. @@ -181,10 +180,10 @@ public ChangeFeedProcessorOptions setMaxItemCount(int maxItemCount) { /** * Gets the start request continuation token to start looking for changes after. *

- * This is only used when lease store is not initialized and is ignored if a lease for partition exists and - * has continuation token. If this is specified, both StartTime and StartFromBeginning are ignored. + * This option can be used when lease store is not initialized and it is ignored if a lease document exists and + * has continuation token that is not null. If this is specified, both StartTime and StartFromBeginning are ignored. * - * @return the start request continuation token to start looking for changes after. + * @return the string representing a continuation token that will be used to get document feeds starting with. */ public String getStartContinuation() { return this.startContinuation; @@ -193,11 +192,11 @@ public String getStartContinuation() { /** * Sets the start request continuation token to start looking for changes after. *

- * This is only used when lease store is not initialized and is ignored if a lease for partition exists and - * has continuation token. If this is specified, both StartTime and StartFromBeginning are ignored. + * This option can be used when lease store is not initialized and it is ignored if a lease document exists and + * has continuation token that is not null. If this is specified, both StartTime and StartFromBeginning are ignored. * * @param startContinuation the start request continuation token to start looking for changes after. - * @return the current ChangeFeedProcessorOptions instance. + * @return the string representing a continuation token that will be used to get document feeds starting with. */ public ChangeFeedProcessorOptions setStartContinuation(String startContinuation) { this.startContinuation = startContinuation; @@ -207,10 +206,11 @@ public ChangeFeedProcessorOptions setStartContinuation(String startContinuation) /** * Gets the time (exclusive) to start looking for changes after. *

- * This is only used when: - * (1) Lease store is not initialized and is ignored if a lease for partition exists and has continuation token. - * (2) StartContinuation is not specified. - * If this is specified, StartFromBeginning is ignored. + * This option can be used when: + * (1) Lease documents are not initialized; this setting will be ignored if the lease documents exists and have a + * valid continuation token. + * (2) Start continuation token option is not specified. + * If this option is specified, "start from beginning" option is ignored. * * @return the time (exclusive) to start looking for changes after. */ @@ -221,10 +221,11 @@ public OffsetDateTime getStartTime() { /** * Sets the time (exclusive) to start looking for changes after (UTC time). *

- * This is only used when: - * (1) Lease store is not initialized and is ignored if a lease for partition exists and has continuation token. - * (2) StartContinuation is not specified. - * If this is specified, StartFromBeginning is ignored. + * This option can be used when: + * (1) Lease documents are not initialized; this setting will be ignored if the lease documents exists and have a + * valid continuation token. + * (2) Start continuation token option is not specified. + * If this option is specified, "start from beginning" option is ignored. * * @param startTime the time (exclusive) to start looking for changes after. * @return the current ChangeFeedProcessorOptions instance. @@ -238,10 +239,11 @@ public ChangeFeedProcessorOptions setStartTime(OffsetDateTime startTime) { * Gets a value indicating whether change feed in the Azure Cosmos DB service should start from beginning (true) * or from current (false). By default it's start from current (false). *

- * This is only used when: - * (1) Lease store is not initialized and is ignored if a lease for partition exists and has continuation token. - * (2) StartContinuation is not specified. - * (3) StartTime is not specified. + * This option can be used when: + * (1) Lease documents are not initialized; this setting will be ignored if the lease documents exists and have a + * valid continuation token. + * (2) Start continuation token option is not specified. + * (3) Start time option is not specified. * * @return a value indicating whether change feed in the Azure Cosmos DB service should start from. */ @@ -252,10 +254,11 @@ public boolean isStartFromBeginning() { /** * Sets a value indicating whether change feed in the Azure Cosmos DB service should start from beginning. *

- * This is only used when: - * (1) Lease store is not initialized and is ignored if a lease for partition exists and has continuation token. - * (2) StartContinuation is not specified. - * (3) StartTime is not specified. + * This option can be used when: + * (1) Lease documents are not initialized; this setting will be ignored if the lease documents exists and have a + * valid continuation token. + * (2) Start continuation token option is not specified. + * (3) Start time option is not specified. * * @param startFromBeginning Indicates to start from beginning if true * @return the current ChangeFeedProcessorOptions instance. @@ -266,10 +269,10 @@ public ChangeFeedProcessorOptions setStartFromBeginning(boolean startFromBeginni } /** - * Gets the minimum partition count for the host. + * Gets the minimum partition count (parallel workers) for the current host. *

- * This can be used to increase the number of partitions for the host and thus override equal distribution (which - * is the default) of leases between hosts. + * This option can be used to increase the number of partitions (parallel workers) for the host and thus override + * the default equal distribution of leases between multiple hosts. * * @return the minimum scale count for the host. */ @@ -278,10 +281,10 @@ public int getMinScaleCount() { } /** - * Sets the minimum partition count for the host. + * Sets the minimum partition count (parallel workers) for the current host. *

- * This can be used to increase the number of partitions for the host and thus override equal distribution (which - * is the default) of leases between hosts. + * This option can be used to increase the number of partitions (parallel workers) for the host and thus override + * the default equal distribution of leases between multiple hosts. * * @param minScaleCount the minimum partition count for the host. * @return the current ChangeFeedProcessorOptions instance. @@ -292,48 +295,28 @@ public ChangeFeedProcessorOptions setMinScaleCount(int minScaleCount) { } /** - * Gets the maximum number of partitions the host can serve. + * Gets the maximum number of partitions (parallel workers) the host can run. *

- * This can be used property to limit the number of partitions for the host and thus override equal distribution - * (which is the default) of leases between hosts. DEFAULT is 0 (unlimited). + * This option can be used to limit the number of partitions (parallel workers) for the host and thus override + * the default equal distribution of leases between multiple hosts. Default setting is "0", unlimited. * - * @return the maximum number of partitions the host can serve. + * @return the maximum number of partitions (parallel workers) the host can run. */ public int getMaxScaleCount() { return this.maxScaleCount; } /** - * Sets the maximum number of partitions the host can serve. + * Sets the maximum number of partitions (parallel workers) the host can run. + *

+ * This option can be used to limit the number of partitions (parallel workers) for the host and thus override + * the default equal distribution of leases between multiple hosts. Default setting is "0", unlimited. * - * @param maxScaleCount the maximum number of partitions the host can serve. + * @param maxScaleCount the maximum number of partitions (parallel workers) the host can run. * @return the current ChangeFeedProcessorOptions instance. */ public ChangeFeedProcessorOptions setMaxScaleCount(int maxScaleCount) { this.maxScaleCount = maxScaleCount; return this; } - - /** - * Gets a value indicating whether on start of the host all existing leases should be deleted and the host - * should start from scratch. - * - * @return a value indicating whether on start of the host all existing leases should be deleted and the host - * should start from scratch. - */ - public boolean isExistingLeasesDiscarded() { - return this.discardExistingLeases; - } - - /** - * Sets a value indicating whether on start of the host all existing leases should be deleted and the host - * should start from scratch. - * - * @param discardExistingLeases Indicates whether to discard all existing leases if true - * @return the current ChangeFeedProcessorOptions instance. - */ - public ChangeFeedProcessorOptions setExistingLeasesDiscarded(boolean discardExistingLeases) { - this.discardExistingLeases = discardExistingLeases; - return this; - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerRequestOptions.java index bdfbf57beb69..cb46a1778e99 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerRequestOptions.java @@ -6,10 +6,9 @@ import com.azure.cosmos.implementation.RequestOptions; /** - * Encapsulates options that can be specified for a request issued to cosmos container. + * Encapsulates options that can be specified for a request issued to Cosmos container. */ public final class CosmosContainerRequestOptions { - private Integer offerThroughput; private boolean populateQuotaInfo; private ConsistencyLevel consistencyLevel; private String sessionToken; @@ -17,26 +16,6 @@ public final class CosmosContainerRequestOptions { private String ifNoneMatchETag; private ThroughputProperties throughputProperties; - /** - * Gets the throughput in the form of Request Units per second when creating a cosmos container. - * - * @return the throughput value. - */ - Integer getOfferThroughput() { - return offerThroughput; - } - - /** - * Sets the throughput in the form of Request Units per second when creating a cosmos container. - * - * @param offerThroughput the throughput value. - * @return the current request options - */ - CosmosContainerRequestOptions setOfferThroughput(Integer offerThroughput) { - this.offerThroughput = offerThroughput; - return this; - } - /** * Gets the PopulateQuotaInfo setting for cosmos container read requests in the Azure Cosmos DB database service. * PopulateQuotaInfo is used to enable/disable getting cosmos container quota related stats for document @@ -150,7 +129,6 @@ RequestOptions toRequestOptions() { RequestOptions options = new RequestOptions(); options.setIfMatchETag(getIfMatchETag()); options.setIfNoneMatchETag(getIfNoneMatchETag()); - options.setOfferThroughput(offerThroughput); options.setPopulateQuotaInfo(populateQuotaInfo); options.setSessionToken(sessionToken); options.setConsistencyLevel(consistencyLevel); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosDatabaseRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosDatabaseRequestOptions.java index 94a2e9e0e10c..fec9bf1acfd1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosDatabaseRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosDatabaseRequestOptions.java @@ -8,7 +8,6 @@ * Encapsulates options that can be specified for a request issued to cosmos database. */ public final class CosmosDatabaseRequestOptions { - private Integer offerThroughput; private String ifMatchETag; private String ifNoneMatchETag; private ThroughputProperties throughputProperties; @@ -53,26 +52,6 @@ public CosmosDatabaseRequestOptions setIfNoneMatchETag(String ifNoneMatchETag) { return this; } - /** - * Gets the throughput in the form of Request Units per second when creating a cosmos database. - * - * @return the throughput value. - */ - Integer getOfferThroughput() { - return offerThroughput; - } - - /** - * Sets the throughput in the form of Request Units per second when creating a cosmos database. - * - * @param offerThroughput the throughput value. - * @return the current request options - */ - CosmosDatabaseRequestOptions setOfferThroughput(Integer offerThroughput) { - this.offerThroughput = offerThroughput; - return this; - } - CosmosDatabaseRequestOptions setThroughputProperties(ThroughputProperties throughputProperties) { this.throughputProperties = throughputProperties; return this; @@ -82,7 +61,6 @@ RequestOptions toRequestOptions() { RequestOptions options = new RequestOptions(); options.setIfMatchETag(getIfMatchETag()); options.setIfNoneMatchETag(getIfNoneMatchETag()); - options.setOfferThroughput(offerThroughput); options.setThroughputProperties(this.throughputProperties); return options; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java index e61da8ba2c3a..0586662cd14a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java @@ -257,32 +257,38 @@ public static RequestOptions toRequestOptions(CosmosContainerRequestOptions cosm return cosmosContainerRequestOptions.toRequestOptions(); } - @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosContainerRequestOptions setOfferThroughput(CosmosContainerRequestOptions cosmosContainerRequestOptions, - Integer offerThroughput) { - return cosmosContainerRequestOptions.setOfferThroughput(offerThroughput); - } +// @Warning(value = INTERNAL_USE_ONLY_WARNING) +// public static CosmosContainerRequestOptions setOfferThroughput(CosmosContainerRequestOptions cosmosContainerRequestOptions, +// Integer offerThroughput) { +// return cosmosContainerRequestOptions.setOfferThroughput(offerThroughput); +// } +// +// @Warning(value = INTERNAL_USE_ONLY_WARNING) +// public static CosmosContainerRequestOptions setThroughputProperties(CosmosContainerRequestOptions cosmosContainerRequestOptions, +// ThroughputProperties throughputProperties) { +// return cosmosContainerRequestOptions.setThroughputProperties(throughputProperties); +// } @Warning(value = INTERNAL_USE_ONLY_WARNING) public static RequestOptions toRequestOptions(CosmosDatabaseRequestOptions cosmosDatabaseRequestOptions) { return cosmosDatabaseRequestOptions.toRequestOptions(); } - @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosDatabaseRequestOptions setOfferThroughput(CosmosDatabaseRequestOptions cosmosDatabaseRequestOptions, - Integer offerThroughput) { - return cosmosDatabaseRequestOptions.setOfferThroughput(offerThroughput); - } +// @Warning(value = INTERNAL_USE_ONLY_WARNING) +// public static CosmosDatabaseRequestOptions setOfferThroughput(CosmosDatabaseRequestOptions cosmosDatabaseRequestOptions, +// Integer offerThroughput) { +// return cosmosDatabaseRequestOptions.setOfferThroughput(offerThroughput); +// } @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosDatabaseRequestOptions setOfferProperties( + public static CosmosDatabaseRequestOptions setThroughputProperties( CosmosDatabaseRequestOptions cosmosDatabaseRequestOptions, ThroughputProperties throughputProperties) { return cosmosDatabaseRequestOptions.setThroughputProperties(throughputProperties); } @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosContainerRequestOptions setOfferProperties( + public static CosmosContainerRequestOptions setThroughputProperties( CosmosContainerRequestOptions containerRequestOptions, ThroughputProperties throughputProperties) { return containerRequestOptions.setThroughputProperties(throughputProperties); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java index e64efdf86c9f..eed6e49e8826 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java @@ -14,6 +14,7 @@ import com.azure.cosmos.models.IndexingPolicy; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.rx.TestSuiteBase; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.util.CosmosPagedIterable; @@ -169,7 +170,7 @@ public void createContainer_withNamePartitionPathAndThroughput() throws Exceptio int throughput = 1000; CosmosContainerResponse containerResponse = createdDatabase.createContainer(collectionName, - partitionKeyPath, throughput); + partitionKeyPath, ThroughputProperties.createManualThroughput(throughput)); validateContainerResponse(new CosmosContainerProperties(collectionName, partitionKeyPath), containerResponse); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDatabaseTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDatabaseTest.java index f74a13840355..a7982fcb40cb 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDatabaseTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDatabaseTest.java @@ -11,6 +11,7 @@ import com.azure.cosmos.models.CosmosDatabaseResponse; import com.azure.cosmos.models.FeedOptions; import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.rx.TestSuiteBase; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.util.CosmosPagedIterable; @@ -104,7 +105,7 @@ public void createDatabase_withPropertiesThroughputAndOptions() throws Exception CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions(); int throughput = 400; try { - CosmosDatabaseResponse createResponse = client.createDatabase(databaseProperties, throughput, requestOptions); + CosmosDatabaseResponse createResponse = client.createDatabase(databaseProperties, ThroughputProperties.createManualThroughput(throughput), requestOptions); validateDatabaseResponse(databaseDefinition, createResponse); } catch (CosmosException ex) { assertThat(ex.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.FORBIDDEN); @@ -117,7 +118,7 @@ public void createDatabase_withPropertiesAndThroughput() throws Exception { CosmosDatabaseProperties databaseProperties = new CosmosDatabaseProperties(databaseDefinition.getId()); int throughput = 1000; try { - CosmosDatabaseResponse createResponse = client.createDatabase(databaseProperties, throughput); + CosmosDatabaseResponse createResponse = client.createDatabase(databaseProperties, ThroughputProperties.createManualThroughput(throughput)); validateDatabaseResponse(databaseDefinition, createResponse); } catch (Exception ex) { if (ex instanceof CosmosException) { @@ -133,7 +134,7 @@ public void createDatabase_withIdAndThroughput() throws Exception { CosmosDatabaseProperties databaseDefinition = new CosmosDatabaseProperties(CosmosDatabaseForTest.generateId()); int throughput = 1000; try { - CosmosDatabaseResponse createResponse = client.createDatabase(databaseDefinition.getId(), throughput); + CosmosDatabaseResponse createResponse = client.createDatabase(databaseDefinition.getId(), ThroughputProperties.createManualThroughput(throughput)); validateDatabaseResponse(databaseDefinition, createResponse); } catch (Exception ex) { if (ex instanceof CosmosException) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/CosmosPartitionKeyTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/CosmosPartitionKeyTests.java index 77f125610654..31901d845820 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/CosmosPartitionKeyTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/CosmosPartitionKeyTests.java @@ -203,7 +203,7 @@ public void nonPartitionedCollectionOperations() throws Exception { .build(); validateQuerySuccess(queryFlux.byPage(), queryValidator); - queryFlux = createdContainer.readAllItems(feedOptions, CosmosItemProperties.class); + queryFlux = createdContainer.queryItems("SELECT * FROM r", feedOptions, CosmosItemProperties.class); queryValidator = new FeedResponseListValidator.Builder() .totalSize(3) .numberOfPages(1) @@ -238,7 +238,7 @@ public void nonPartitionedCollectionOperations() throws Exception { // 3 previous items + 1 created from the sproc expectedIds.add(documentCreatedBySprocId); - queryFlux = createdContainer.readAllItems(feedOptions, CosmosItemProperties.class); + queryFlux = createdContainer.queryItems("SELECT * FROM r", feedOptions, CosmosItemProperties.class); queryValidator = new FeedResponseListValidator.Builder() .totalSize(4) .numberOfPages(1) @@ -276,7 +276,7 @@ public void nonPartitionedCollectionOperations() throws Exception { .build(); this.validateItemSuccess(deleteMono, deleteResponseValidator); - queryFlux = createdContainer.readAllItems(feedOptions, CosmosItemProperties.class); + queryFlux = createdContainer.queryItems("SELECT * FROM r", feedOptions, CosmosItemProperties.class); queryValidator = new FeedResponseListValidator.Builder() .totalSize(0) .numberOfPages(1) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java index 303064badea2..13ae8c444b9f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java @@ -69,7 +69,8 @@ public BackPressureTest(CosmosClientBuilder clientBuilder) { public void readFeedPages() throws Exception { FeedOptions options = new FeedOptions(); - CosmosPagedFlux queryObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); + CosmosPagedFlux queryObservable = createdCollection + .queryItems("SELECT * FROM r", options, CosmosItemProperties.class); RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest) CosmosBridgeInternal.getAsyncDocumentClient(client); AtomicInteger valueCount = new AtomicInteger(); @@ -113,7 +114,8 @@ public void readFeedPages() throws Exception { public void readFeedItems() throws Exception { FeedOptions options = new FeedOptions(); - CosmosPagedFlux queryObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); + CosmosPagedFlux queryObservable = createdCollection + .queryItems("SELECT * FROM r", options, CosmosItemProperties.class); RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest) CosmosBridgeInternal.getAsyncDocumentClient(client); AtomicInteger valueCount = new AtomicInteger(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index a68cdefeb623..9c4b91ab534e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -20,6 +20,7 @@ import com.azure.cosmos.implementation.CosmosItemProperties; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.changefeed.ServiceItemLease; +import com.azure.cosmos.models.ThroughputProperties; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -97,7 +98,6 @@ public void readFeedDocumentsStartFromBeginning() throws InterruptedException { .setMaxItemCount(10) .setStartFromBeginning(true) .setMaxScaleCount(0) // unlimited - .setExistingLeasesDiscarded(true) ) .build(); @@ -161,7 +161,6 @@ public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { .setStartTime(OffsetDateTime.now().minusDays(1)) .setMinScaleCount(1) .setMaxScaleCount(3) - .setExistingLeasesDiscarded(true) ) .build(); @@ -199,6 +198,83 @@ public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { } } + @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) + public void getEstimatedLag() throws InterruptedException { + CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.changeFeedProcessorBuilder() + .hostName(hostName) + .handleChanges((List docs) -> { + ChangeFeedProcessorTest.log.info("START processing from thread {}", Thread.currentThread().getId()); + for (JsonNode item : docs) { + processItem(item, receivedDocuments); + } + ChangeFeedProcessorTest.log.info("END processing from thread {}", Thread.currentThread().getId()); + }) + .feedContainer(createdFeedCollection) + .leaseContainer(createdLeaseCollection) + .build(); + + try { + changeFeedProcessor.start().subscribeOn(Schedulers.elastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .then(Mono.just(changeFeedProcessor) + .delayElement(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .flatMap(value -> changeFeedProcessor.stop() + .subscribeOn(Schedulers.elastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + )) + .subscribe(); + } catch (Exception ex) { + log.error("Change feed processor did not start and stopped in the expected time", ex); + throw ex; + } + + Thread.sleep(4 * CHANGE_FEED_PROCESSOR_TIMEOUT); + + // Test for "zero" lag + Map estimatedLagResult = changeFeedProcessor.getEstimatedLag() + .map(getEstimatedLag -> { + System.out.println(getEstimatedLag); + return getEstimatedLag; + }).block(); + + int totalLag = 0; + for (int lag : estimatedLagResult.values()) { + totalLag += lag; + } + + assertThat(totalLag == 0).as("Change Feed Processor estimated total lag at start").isTrue(); + + // Test for "FEED_COUNT total lag + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT); + + estimatedLagResult = changeFeedProcessor.getEstimatedLag() + .map(getEstimatedLag -> { + System.out.println(getEstimatedLag); + return getEstimatedLag; + }).block(); + + totalLag = 0; + for (int lag : estimatedLagResult.values()) { + totalLag += lag; + } + + assertThat(totalLag == FEED_COUNT).as("Change Feed Processor estimated total lag").isTrue(); + + } finally { + safeDeleteCollection(createdFeedCollection); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void staledLeaseAcquiring() throws InterruptedException { final String ownerFirst = "Owner_First"; @@ -376,9 +452,12 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { }) .then( // increase throughput to force a single partition collection to go through a split - createdFeedCollectionForSplit.readProvisionedThroughput().subscribeOn(Schedulers.elastic()) + createdFeedCollectionForSplit + .readThroughput().subscribeOn(Schedulers.elastic()) .flatMap(currentThroughput -> - createdFeedCollectionForSplit.replaceProvisionedThroughput(FEED_COLLECTION_THROUGHPUT).subscribeOn(Schedulers.elastic()) + createdFeedCollectionForSplit + .replaceThroughput(ThroughputProperties.createManualThroughput(FEED_COLLECTION_THROUGHPUT)) + .subscribeOn(Schedulers.elastic()) ) .then() ) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionCrudTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionCrudTest.java index c16aa56ff622..36c16441b5c3 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionCrudTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionCrudTest.java @@ -28,6 +28,7 @@ import com.azure.cosmos.implementation.Database; import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.RetryAnalyzer; +import com.azure.cosmos.models.ThroughputProperties; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -378,15 +379,19 @@ public void replaceProvisionedThroughput(){ .block() .getDatabase(); CosmosContainerProperties containerProperties = new CosmosContainerProperties("testCol", "/myPk"); - CosmosAsyncContainer container = database.createContainer(containerProperties, 1000, + CosmosAsyncContainer container = database.createContainer(containerProperties, ThroughputProperties.createManualThroughput(1000), new CosmosContainerRequestOptions()) .block() .getContainer(); - Integer throughput = container.readProvisionedThroughput().block(); + Integer throughput = container.readThroughput().block() + .getProperties().getManualThroughput(); assertThat(throughput).isEqualTo(1000); - throughput = container.replaceProvisionedThroughput(2000).block(); + throughput = container.replaceThroughput(ThroughputProperties.createManualThroughput(2000)) + .block() + .getProperties() + .getManualThroughput(); assertThat(throughput).isEqualTo(2000); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 16df31ea88dd..3c4b113f1eeb 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -40,7 +40,8 @@ public void readDocuments() { FeedOptions options = new FeedOptions(); int maxItemCount = 2; - CosmosPagedFlux feedObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); + CosmosPagedFlux feedObservable = createdCollection + .queryItems("SELECT * FROM r", options, CosmosItemProperties.class); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() .totalSize(createdDocuments.size()) .numberOfPagesIsGreaterThanOrEqualTo(1) @@ -59,7 +60,8 @@ public void readDocuments_withoutEnableCrossPartitionQuery() { FeedOptions options = new FeedOptions(); int maxItemCount = 2; - CosmosPagedFlux feedObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); + CosmosPagedFlux feedObservable = createdCollection + .queryItems("SELECT * FROM r", options, CosmosItemProperties.class); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() .totalSize(createdDocuments.size()) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index 75d3c0833a3a..3043a542f89e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -37,7 +37,8 @@ public void readDocuments() { final FeedOptions options = new FeedOptions(); int maxItemCount = 2; - final CosmosPagedFlux feedObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); + final CosmosPagedFlux feedObservable = createdCollection + .queryItems("SELECT * FROM r", options, CosmosItemProperties.class); final int expectedPageSize = (createdDocuments.size() + maxItemCount - 1) / maxItemCount; FeedResponseListValidator validator = new FeedResponseListValidator.Builder() diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 104d45632de7..561b5ef0d460 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -53,6 +53,7 @@ import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.util.CosmosPagedFlux; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.type.TypeReference; @@ -335,7 +336,7 @@ protected static void waitIfNeededForReplicasToCatchUp(CosmosClientBuilder clien public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, CosmosContainerRequestOptions options, int throughput) { - return database.createContainer(cosmosContainerProperties, throughput, options).block().getContainer(); + return database.createContainer(cosmosContainerProperties, ThroughputProperties.createManualThroughput(throughput), options).block().getContainer(); } public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties,