Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.cosmos.models.FeedOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -323,6 +324,19 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id, int throughpu
return createDatabase(new CosmosDatabaseProperties(id), options);
}

/**
* Creates a database.
*
* @param id the id
* @param throughputProperties the throughputProperties
* @return the mono
*/
public Mono<CosmosAsyncDatabaseResponse> createDatabase(String id, ThroughputProperties throughputProperties) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setOfferProperties(options, throughputProperties);
return createDatabase(new CosmosDatabaseProperties(id), options);
}

/**
* Reads all databases.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -661,6 +663,70 @@ public Mono<Integer> replaceProvisionedThroughput(int requestUnitsPerSecond) {
}).map(offerResourceResponse -> offerResourceResponse.getResource().getThroughput());
}

/**
* Replace the throughput .
*
* @param throughputProperties the throughput properties
* @return the mono containing throughput response
*/
public Mono<ThroughputResponse> replaceThroughput(ThroughputProperties throughputProperties) {
return this.read()
.flatMap(response -> this.database.getDocClientWrapper()
.queryOffers(database.getOfferQuerySpecFromResourceId(response
.getProperties()
.getResourceId())
, new FeedOptions())
.single()
.flatMap(offerFeedResponse -> {
if (offerFeedResponse.getResults().isEmpty()) {
return Mono.error(BridgeInternal
.createCosmosClientException(
HttpConstants.StatusCodes.BADREQUEST,
"No offers found for the " +
"resource " + this.getId()));
}

Offer existingOffer = offerFeedResponse.getResults().get(0);
Offer updatedOffer =
ModelBridgeInternal.updateOfferFromProperties(existingOffer,
throughputProperties);
return this.database.getDocClientWrapper()
.replaceOffer(updatedOffer)
.single();
}).map(ModelBridgeInternal::createThroughputRespose));
}

/**
* Read the throughput throughput .
*
* @return the mono containing throughput response
*/
public Mono<ThroughputResponse> readThroughput() {
return this.read()
.flatMap(response -> this.database.getDocClientWrapper()
.queryOffers(database.getOfferQuerySpecFromResourceId(response
.getProperties()
.getResourceId())
, new FeedOptions())
.single()
.flatMap(offerFeedResponse -> {
if (offerFeedResponse.getResults().isEmpty()) {
return Mono.error(BridgeInternal
.createCosmosClientException(
HttpConstants.StatusCodes.BADREQUEST,
"No offers found for the resource "
+ this.getId()));
}
return this.database.getDocClientWrapper()
.readOffer(offerFeedResponse.getResults()
.get(0)
.getSelfLink())
.single();
})
.map(ModelBridgeInternal::createThroughputRespose));
}


/**
* Gets the parent Database
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@
import com.azure.cosmos.models.CosmosAsyncUserResponse;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosUserProperties;
import com.azure.cosmos.models.FeedOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.List;

import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;

/**
Expand Down Expand Up @@ -154,6 +161,22 @@ public Mono<CosmosAsyncContainerResponse> createContainer(
return createContainer(containerProperties, options);
}

/**
* Creates a container.
*
* @param containerProperties the container properties
* @param throughputProperties the throughput properties
* @param options the request options
* @return the mono
*/
public Mono<CosmosAsyncContainerResponse> createContainer(
CosmosContainerProperties containerProperties,
ThroughputProperties throughputProperties,
CosmosContainerRequestOptions options){
ModelBridgeInternal.setOfferProperties(options, throughputProperties);
return createContainer(containerProperties, options);
}

/**
* Creates a document container.
* <p>
Expand Down Expand Up @@ -663,6 +686,79 @@ public Mono<Integer> replaceProvisionedThroughput(int requestUnitsPerSecond) {
.getThroughput()));
}

/**
* Sets throughput provisioned for a container in measurement of
* Requests-per-Unit in the Azure Cosmos service.
*
* @param throughputProperties the throughput properties
* @return the mono
*/
public Mono<ThroughputResponse> replaceThroughput(ThroughputProperties throughputProperties) {
return this.read()
.flatMap(response -> this.getDocClientWrapper()
.queryOffers(getOfferQuerySpecFromResourceId(response.getProperties()
.getResourceId()),
new FeedOptions())
.single()
.flatMap(offerFeedResponse -> {
if (offerFeedResponse.getResults().isEmpty()) {
return Mono.error(BridgeInternal
.createCosmosClientException(
HttpConstants.StatusCodes.BADREQUEST,
"No offers found for the " +
"resource " + this.getId()));
}

Offer existingOffer = offerFeedResponse.getResults().get(0);
Offer updatedOffer =
ModelBridgeInternal.updateOfferFromProperties(existingOffer,
throughputProperties);

return this.getDocClientWrapper()
.replaceOffer(updatedOffer)
.single();
})
.map(ModelBridgeInternal::createThroughputRespose));
}

/**
* Gets the throughput of the database
*
* @return the mono containing throughput response
*/
public Mono<ThroughputResponse> readThroughput() {
return this.read()
.flatMap(response -> getDocClientWrapper()
.queryOffers(getOfferQuerySpecFromResourceId(response.getProperties()
.getResourceId()),
new FeedOptions())
.single()
.flatMap(offerFeedResponse -> {
if (offerFeedResponse.getResults().isEmpty()) {
return Mono.error(BridgeInternal
.createCosmosClientException(
HttpConstants.StatusCodes.BADREQUEST,
"No offers found for the " +
"resource " + this.getId()));
}
return getDocClientWrapper()
.readOffer(offerFeedResponse.getResults()
.get(0)
.getSelfLink())
.single();
})
.map(ModelBridgeInternal::createThroughputRespose));
}

SqlQuerySpec getOfferQuerySpecFromResourceId(String resourceId) {
String queryText = "select * from c where c.offerResourceId = @resourceId";
SqlQuerySpec querySpec = new SqlQuerySpec(queryText);
List<SqlParameter> parameters = Collections
.singletonList(new SqlParameter("@resourceId", resourceId));
querySpec.setParameters(parameters);
return querySpec;
}

CosmosAsyncClient getClient() {
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.azure.cosmos.util.UtilBridgeInternal;
Expand Down Expand Up @@ -140,6 +142,24 @@ public Integer replaceProvisionedThroughput(int requestUnitsPerSecond) throws Co
.replaceProvisionedThroughput(requestUnitsPerSecond));
}

/**
* Sets the throughput.
*
* @param throughputProperties the throughput properties
* @return the throughput response
*/
public ThroughputResponse replaceThroughput(ThroughputProperties throughputProperties) {
return database.throughputResponseToBlock(this.asyncContainer.replaceThroughput(throughputProperties));
}

/**
* Gets the throughput.
*
* @return the throughput response
*/
public ThroughputResponse readThroughput() {
return database.throughputResponseToBlock(this.asyncContainer.readThroughput());
}

/* CosmosAsyncItem operations */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.azure.cosmos.models.FeedOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.azure.cosmos.util.UtilBridgeInternal;
Expand Down Expand Up @@ -155,6 +157,24 @@ public CosmosContainerResponse createContainer(
options));
}

/**
* Creates a cosmos container.
*
* @param containerProperties the container properties
* @param throughputProperties the throughput properties
* @param options the options
* @return the cosmos container response
* @throws CosmosClientException the cosmos client exception
*/
public CosmosContainerResponse createContainer(
CosmosContainerProperties containerProperties,
ThroughputProperties throughputProperties,
CosmosContainerRequestOptions options) throws CosmosClientException {
return this.mapContainerResponseAndBlock(databaseWrapper.createContainer(containerProperties,
throughputProperties,
options));
}

/**
* Create container cosmos sync container response.
*
Expand Down Expand Up @@ -479,7 +499,26 @@ public Integer replaceProvisionedThroughput(int requestUnitsPerSecond) throws Co
return throughputResponseToBlock(databaseWrapper.replaceProvisionedThroughput(requestUnitsPerSecond));
}

Integer throughputResponseToBlock(Mono<Integer> throughputResponse) throws CosmosClientException {
/**
* Sets the throughput.
*
* @param throughputProperties the throughput properties
* @return the throughput response
*/
public ThroughputResponse replaceThroughput(ThroughputProperties throughputProperties) {
return throughputResponseToBlock(databaseWrapper.replaceThroughput(throughputProperties));
}

/**
* Gets the throughput of the database
*
* @return the throughput response
*/
public ThroughputResponse readThroughput() {
return throughputResponseToBlock(databaseWrapper.readThroughput());
}

<T> T throughputResponseToBlock(Mono<T> throughputResponse) throws CosmosClientException {
try {
return throughputResponse.block();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public static final class Properties {
public static final String DOCUMENTS_LINK = "_docs";
public static final String RESOURCE_LINK = "resource";
public static final String MEDIA_LINK = "media";
public static final String AUTOPILOT_MAX_THROUGHPUT = "maxThroughput";
public static final String AUTOPILOT_AUTO_UPGRADE_POLICY = "autoUpgradePolicy";
public static final String AUTOPILOT_AUTO_THROUGHPUT_POLICY = "throughputPolicy";
public static final String AUTOPILOT_THROUGHPUT_POLICY_INCREMENT_PERCENT = "incrementPercent";
public static final String AUTOPILOT_SETTINGS = "offerAutopilotSettings";

public static final String PERMISSION_MODE = "permissionMode";
public static final String RESOURCE_KEY = "key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public static class HttpHeaders {
public static final String OFFER_TYPE = "x-ms-offer-type";
public static final String OFFER_THROUGHPUT = "x-ms-offer-throughput";
public static final String OFFER_IS_RU_PER_MINUTE_THROUGHPUT_ENABLED = "x-ms-offer-is-ru-per-minute-throughput-enabled";
public static final String OFFER_MIN_THROUGHPUT = "x-ms-cosmos-min-throughput";
public static final String OFFER_AUTOPILOT_SETTINGS = "x-ms-cosmos-offer-autopilot-settings";
public static final String OFFER_REPLACE_PENDING = "x-ms-offer-replace-pending";

// Upsert header
public static final String IS_UPSERT = "x-ms-documentdb-is-upsert";
Expand Down
Loading