Skip to content

Commit

Permalink
Limit batch size (storage performs better if the batches are split)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Apr 27, 2016
1 parent 6ceb59f commit 23f4f23
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ public DefaultStorageRpc(StorageOptions options) {

private class DefaultRpcBatch implements RpcBatch {

private static final int MAX_BATCH_DELETES = 100;
// Batch size is limited as, due to some current service implementation details, the service
// performs better if the batches are split for better distribution. See
// https://github.com/GoogleCloudPlatform/gcloud-java/pull/952#issuecomment-213466772 for
// background.
private static final int MAX_BATCH_SIZE = 100;

private final Storage storage;
private final LinkedList<BatchRequest> batches;
private int deleteCount;
private int currentBatchSize;

private DefaultRpcBatch(Storage storage) {
this.storage = storage;
Expand All @@ -111,12 +115,12 @@ private DefaultRpcBatch(Storage storage) {
public void addDelete(StorageObject storageObject, RpcBatch.Callback<Void> callback,
Map<Option, ?> options) {
try {
if (deleteCount == MAX_BATCH_DELETES) {
if (currentBatchSize == MAX_BATCH_SIZE) {
batches.add(storage.batch());
deleteCount = 0;
currentBatchSize = 0;
}
deleteCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
deleteCount++;
currentBatchSize++;
} catch (IOException ex) {
throw translate(ex);
}
Expand All @@ -126,7 +130,12 @@ public void addDelete(StorageObject storageObject, RpcBatch.Callback<Void> callb
public void addPatch(StorageObject storageObject, RpcBatch.Callback<StorageObject> callback,
Map<Option, ?> options) {
try {
patchCall(storageObject, options).queue(batches.getFirst(), toJsonCallback(callback));
if (currentBatchSize == MAX_BATCH_SIZE) {
batches.add(storage.batch());
currentBatchSize = 0;
}
patchCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
currentBatchSize++;
} catch (IOException ex) {
throw translate(ex);
}
Expand All @@ -136,7 +145,12 @@ public void addPatch(StorageObject storageObject, RpcBatch.Callback<StorageObjec
public void addGet(StorageObject storageObject, RpcBatch.Callback<StorageObject> callback,
Map<Option, ?> options) {
try {
getCall(storageObject, options).queue(batches.getFirst(), toJsonCallback(callback));
if (currentBatchSize == MAX_BATCH_SIZE) {
batches.add(storage.batch());
currentBatchSize = 0;
}
getCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
currentBatchSize++;
} catch (IOException ex) {
throw translate(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class ITStorageTest {
private static final String CONTENT_TYPE = "text/plain";
private static final byte[] BLOB_BYTE_CONTENT = {0xD, 0xE, 0xA, 0xD};
private static final String BLOB_STRING_CONTENT = "Hello Google Cloud Storage!";
private static final int MAX_BATCH_DELETES = 100;
private static final int MAX_BATCH_SIZE = 100;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -811,19 +811,31 @@ public void testBatchRequest() {
}

@Test
public void testBatchRequestManyDeletes() {
List<BlobId> blobsToDelete = Lists.newArrayListWithCapacity(2 * MAX_BATCH_DELETES);
public void testBatchRequestManyOperations() {
List<StorageBatchResult<Boolean>> deleteResults =
Lists.newArrayListWithCapacity(2 * MAX_BATCH_DELETES);
for (int i = 0; i < 2 * MAX_BATCH_DELETES; i++) {
blobsToDelete.add(BlobId.of(BUCKET, "test-batch-request-many-deletes-blob-" + i));
}
Lists.newArrayListWithCapacity(MAX_BATCH_SIZE);
List<StorageBatchResult<Blob>> getResults =
Lists.newArrayListWithCapacity(MAX_BATCH_SIZE / 2);
List<StorageBatchResult<Blob>> updateResults =
Lists.newArrayListWithCapacity(MAX_BATCH_SIZE / 2);
StorageBatch batch = storage.batch();
for (BlobId blob : blobsToDelete) {
deleteResults.add(batch.delete(blob));
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
BlobId blobId = BlobId.of(BUCKET, "test-batch-request-many-operations-blob-" + i);
deleteResults.add(batch.delete(blobId));
}
for (int i = 0; i < MAX_BATCH_SIZE / 2; i++) {
BlobId blobId = BlobId.of(BUCKET, "test-batch-request-many-operations-blob-" + i);
getResults.add(batch.get(blobId));
}
for (int i = 0; i < MAX_BATCH_SIZE / 2; i++) {
BlobInfo blob =
BlobInfo.builder(BlobId.of(BUCKET, "test-batch-request-many-operations-blob-" + i))
.build();
updateResults.add(batch.update(blob));
}
String sourceBlobName1 = "test-batch-request-many-deletes-source-blob-1";
String sourceBlobName2 = "test-batch-request-many-deletes-source-blob-2";

String sourceBlobName1 = "test-batch-request-many-operations-source-blob-1";
String sourceBlobName2 = "test-batch-request-many-operations-source-blob-2";
BlobInfo sourceBlob1 = BlobInfo.builder(BUCKET, sourceBlobName1).build();
BlobInfo sourceBlob2 = BlobInfo.builder(BUCKET, sourceBlobName2).build();
assertNotNull(storage.create(sourceBlob1));
Expand All @@ -836,21 +848,32 @@ public void testBatchRequestManyDeletes() {
batch.submit();

// Check deletes
for (StorageBatchResult<Boolean> deleteResult : deleteResults) {
assertFalse(deleteResult.get());
for (StorageBatchResult<Boolean> failedDeleteResult : deleteResults) {
assertFalse(failedDeleteResult.get());
}

// Check gets
for (StorageBatchResult<Blob> failedGetResult : getResults) {
assertNull(failedGetResult.get());
}
Blob remoteBlob1 = getResult.get();
assertEquals(sourceBlob1.bucket(), remoteBlob1.bucket());
assertEquals(sourceBlob1.name(), remoteBlob1.name());

// Check updates
for (StorageBatchResult<Blob> failedUpdateResult : updateResults) {
try {
failedUpdateResult.get();
fail("Expected StorageException");
} catch (StorageException ex) {
// expected
}
}
Blob remoteUpdatedBlob2 = updateResult.get();
assertEquals(sourceBlob2.bucket(), remoteUpdatedBlob2.bucket());
assertEquals(sourceBlob2.name(), remoteUpdatedBlob2.name());
assertEquals(updatedBlob2.contentType(), remoteUpdatedBlob2.contentType());

// Check gets
Blob remoteBlob1 = getResult.get();
assertEquals(sourceBlob1.bucket(), remoteBlob1.bucket());
assertEquals(sourceBlob1.name(), remoteBlob1.name());

assertTrue(remoteBlob1.delete());
assertTrue(remoteUpdatedBlob2.delete());
}
Expand Down

0 comments on commit 23f4f23

Please sign in to comment.