Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CosmosDB] Upgraded SDK to the latest version #1628

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions azurecosmos/conf/azurecosmos.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# See https://docs.microsoft.com/en-us/azure/cosmos-db/performance-tips-java-sdk-v4-sql for details on some of the options below.

# Azure Cosmos DB host uri (ex: https://p3rf.documents.azure.com:443/) and primary key.
# azurecosmos.primaryKey =
# azurecosmos.uri =
# azurecosmos.primaryKey =
# azurecosmos.uri =

# Database to be used, if not specified 'ycsb' will be used.
# azurecosmos.databaseName = ycsb
Expand All @@ -31,6 +31,10 @@
# The default is false to reduce output size.
# azurecosmos.includeExceptionStackInLog = false

# Determines if full request diagnostics need to be printed for high latency requests.
# The default is -1(no diagnostics at all)to reduce output size.
# azurecosmos.diagnosticsLatencyThresholdInMS = -1

# The value to be appended to the user-agent header.
# In most cases, you should leave this as "azurecosmos-ycsb".
# azurecosmos.userAgent = azurecosmos-ycsb
Expand All @@ -54,7 +58,7 @@
# Set the maximum retry duration in seconds.
# azurecosmos.maxRetryWaitTimeInSeconds = 30

# Set the value of the connection pool size in gateway mode.
# Set the value of the connection pool size in gateway mode.
# azurecosmos.gatewayMaxConnectionPoolSize = 30

# Set the value of the max connections per endpoint in direct mode.
Expand Down Expand Up @@ -82,3 +86,4 @@
# Sets the preferred page size when scanning.
# Default value is -1.
# azurecosmos.preferredPageSize = -1

147 changes: 84 additions & 63 deletions azurecosmos/src/main/java/site/ycsb/db/AzureCosmosClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@

package site.ycsb.db;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
Expand All @@ -42,6 +27,7 @@
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
Expand All @@ -51,15 +37,28 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import site.ycsb.ByteIterator;
import site.ycsb.DB;
import site.ycsb.DBException;
import site.ycsb.Status;
import site.ycsb.StringByteIterator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Azure Cosmos DB Java SDK 4.6.0 client for YCSB.
* Azure Cosmos DB Java SDK 4.28.0 client for YCSB.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.28.0 -> 4.34.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Change pushed to the same pull request!

*/

public class AzureCosmosClient extends DB {
Expand All @@ -74,7 +73,7 @@ public class AzureCosmosClient extends DB {
private static final int DEFAULT_MAX_DEGREE_OF_PARALLELISM = -1;
private static final int DEFAULT_MAX_BUFFERED_ITEM_COUNT = 0;
private static final int DEFAULT_PREFERRED_PAGE_SIZE = -1;
public static final int NUM_UPDATE_ATTEMPTS = 4;
private static final int DEFAULT_DIAGNOSTICS_LATENCY_THRESHOLD_IN_MS = -1;
private static final boolean DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG = false;
private static final String DEFAULT_USER_AGENT = "azurecosmos-ycsb";

Expand All @@ -93,6 +92,7 @@ public class AzureCosmosClient extends DB {
private static int maxDegreeOfParallelism;
private static int maxBufferedItemCount;
private static int preferredPageSize;
private static int diagnosticsLatencyThresholdInMS;
private static boolean includeExceptionStackInLog;
private static Map<String, CosmosContainer> containerCache;
private static String userAgent;
Expand Down Expand Up @@ -141,6 +141,10 @@ private void initAzureCosmosClient() throws DBException {
AzureCosmosClient.preferredPageSize = this.getIntProperty("azurecosmos.preferredPageSize",
DEFAULT_PREFERRED_PAGE_SIZE);

AzureCosmosClient.diagnosticsLatencyThresholdInMS = this.getIntProperty(
"azurecosmos.diagnosticsLatencyThresholdInMS",
DEFAULT_DIAGNOSTICS_LATENCY_THRESHOLD_IN_MS);

AzureCosmosClient.includeExceptionStackInLog = this.getBooleanProperty("azurecosmos.includeExceptionStackInLog",
DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG);

Expand Down Expand Up @@ -310,10 +314,20 @@ public Status read(String table, String key, Set<String> fields, Map<String, Byt
}
StringByteIterator.putAllAsByteIterators(result, stringResults);
}

if (diagnosticsLatencyThresholdInMS > 0 &&
response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) {
LOGGER.warn(response.getDiagnostics().toString());
}

return Status.OK;
} catch (CosmosException e) {
LOGGER.error("Failed to read key {} in collection {} in database {}", key, table, AzureCosmosClient.databaseName,
e);
int statusCode = e.getStatusCode();
if (!AzureCosmosClient.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to read key {} in collection {} in database {} statusCode {}", key, table,
AzureCosmosClient.databaseName, statusCode, e);
return Status.NOT_FOUND;
}
}
Expand All @@ -333,7 +347,7 @@ public Status read(String table, String key, Set<String> fields, Map<String, Byt
*/
@Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
Vector<HashMap<String, ByteIterator>> result) {
try {
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
queryOptions.setMaxDegreeOfParallelism(AzureCosmosClient.maxDegreeOfParallelism);
Expand Down Expand Up @@ -369,11 +383,12 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f
}
return Status.OK;
} catch (CosmosException e) {
int statusCode = e.getStatusCode();
if (!AzureCosmosClient.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to query key {} from collection {} in database {}", startkey, table,
AzureCosmosClient.databaseName, e);
LOGGER.error("Failed to query key {} from collection {} in database {} statusCode {}", startkey, table,
AzureCosmosClient.databaseName, statusCode, e);
}
return Status.ERROR;
}
Expand All @@ -390,43 +405,33 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f
*/
@Override
public Status update(String table, String key, Map<String, ByteIterator> values) {
try {
CosmosContainer container = AzureCosmosClient.containerCache.get(table);
if (container == null) {
container = AzureCosmosClient.database.getContainer(table);
AzureCosmosClient.containerCache.put(table, container);
}

String readEtag = "";

// Azure Cosmos DB does not have patch support. Until then, we need to read
// the document, update it, and then write it back.
// This could be made more efficient by using a stored procedure
// and doing the read/modify write on the server side. Perhaps
// that will be a future improvement.
for (int attempt = 0; attempt < NUM_UPDATE_ATTEMPTS; attempt++) {
try {
CosmosContainer container = AzureCosmosClient.containerCache.get(table);
if (container == null) {
container = AzureCosmosClient.database.getContainer(table);
AzureCosmosClient.containerCache.put(table, container);
}

CosmosItemResponse<ObjectNode> response = container.readItem(key, new PartitionKey(key), ObjectNode.class);
readEtag = response.getETag();
ObjectNode node = response.getItem();

for (Entry<String, ByteIterator> pair : values.entrySet()) {
node.put(pair.getKey(), pair.getValue().toString());
}
CosmosPatchOperations cosmosPatchOperations = CosmosPatchOperations.create();
for (Entry<String, ByteIterator> pair : values.entrySet()) {
cosmosPatchOperations.replace("/" + pair.getKey(), pair.getValue().toString());
}

CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setIfMatchETag(readEtag);
PartitionKey pk = new PartitionKey(key);
container.replaceItem(node, key, pk, requestOptions);
PartitionKey pk = new PartitionKey(key);
CosmosItemResponse<ObjectNode> response = container.patchItem(key, pk, cosmosPatchOperations, ObjectNode.class);
if (diagnosticsLatencyThresholdInMS > 0 &&
response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) {
LOGGER.warn(response.getDiagnostics().toString());
}

return Status.OK;
} catch (CosmosException e) {
if (!AzureCosmosClient.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to update key {} to collection {} in database {} on attempt {}", key, table,
AzureCosmosClient.databaseName, attempt, e);
return Status.OK;
} catch (CosmosException e) {
int statusCode = e.getStatusCode();
if (!AzureCosmosClient.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to update key {} to collection {} in database {} statusCode {}", key, table,
AzureCosmosClient.databaseName, statusCode, e);
}

return Status.ERROR;
Expand Down Expand Up @@ -460,18 +465,26 @@ public Status insert(String table, String key, Map<String, ByteIterator> values)
for (Map.Entry<String, ByteIterator> pair : values.entrySet()) {
node.put(pair.getKey(), pair.getValue().toString());
}
CosmosItemResponse<ObjectNode> response;
if (AzureCosmosClient.useUpsert) {
container.upsertItem(node, pk, new CosmosItemRequestOptions());
response = container.upsertItem(node, pk, new CosmosItemRequestOptions());
} else {
container.createItem(node, pk, new CosmosItemRequestOptions());
response = container.createItem(node, pk, new CosmosItemRequestOptions());
}

if (diagnosticsLatencyThresholdInMS > 0 &&
response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) {
LOGGER.warn(response.getDiagnostics().toString());
}

return Status.OK;
} catch (CosmosException e) {
int statusCode = e.getStatusCode();
if (!AzureCosmosClient.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to insert key {} to collection {} in database {}", key, table,
AzureCosmosClient.databaseName, e);
LOGGER.error("Failed to insert key {} to collection {} in database {} statusCode {}", key, table,
AzureCosmosClient.databaseName, statusCode, e);
}
return Status.ERROR;
}
Expand All @@ -487,14 +500,22 @@ public Status delete(String table, String key) {
container = AzureCosmosClient.database.getContainer(table);
AzureCosmosClient.containerCache.put(table, container);
}
container.deleteItem(key, new PartitionKey(key), new CosmosItemRequestOptions());
CosmosItemResponse<Object> response = container.deleteItem(key,
new PartitionKey(key),
new CosmosItemRequestOptions());
if (diagnosticsLatencyThresholdInMS > 0 &&
response.getDiagnostics().getDuration().compareTo(Duration.ofMillis(diagnosticsLatencyThresholdInMS)) > 0) {
LOGGER.warn(response.getDiagnostics().toString());
}

return Status.OK;
} catch (Exception e) {
} catch (CosmosException e) {
int statusCode = e.getStatusCode();
if (!AzureCosmosClient.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to delete key {} in collection {}", key, table, e);
LOGGER.error("Failed to delete key {} in collection {} database {} statusCode {}", key, table,
AzureCosmosClient.databaseName, statusCode, e);
}
return Status.ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ LICENSE file.
<aerospike.version>3.1.2</aerospike.version>
<arangodb.version>4.4.1</arangodb.version>
<asynchbase.version>1.8.2</asynchbase.version>
<azurecosmos.version>4.8.0</azurecosmos.version>
<azurecosmos.version>4.34.0</azurecosmos.version>
<azurestorage.version>4.0.0</azurestorage.version>
<cassandra.cql.version>3.0.0</cassandra.cql.version>
<cloudspanner.version>2.0.1</cloudspanner.version>
Expand Down