Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,28 @@ private static <T> boolean addAll(Collection<T> addTo,
return addAll(addTo, elementsToAdd.iterator());
}

/**
* Returns consecutive sub-lists of a list, each of the same size
* (the final list may be smaller).
* @param originalList original big list.
* @param pageSize desired size of each sublist ( last one
* may be smaller)
* @return a list of sub lists.
*/
public static <T> List<List<T>> partition(List<T> originalList, int pageSize) {

Preconditions.checkArgument(originalList != null && originalList.size() > 0,
"Invalid original list");
Preconditions.checkArgument(pageSize > 0, "Page size should " +
"be greater than 0 for performing partition");

List<List<T>> result = new ArrayList<>();
int i=0;
while (i < originalList.size()) {
result.add(originalList.subList(i,
Math.min(i + pageSize, originalList.size())));
i = i + pageSize;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hadoop.util;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -79,6 +81,48 @@ public void testItrLinkedLists() {
Assert.assertEquals(4, list.size());
}

@Test
public void testListsPartition() {
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
list.add("d");
list.add("e");
List<List<String>> res = Lists.
partition(list, 2);
Assertions.assertThat(res)
.describedAs("Number of partitions post partition")
.hasSize(3);
Assertions.assertThat(res.get(0))
.describedAs("Number of elements in first partition")
.hasSize(2);
Assertions.assertThat(res.get(2))
.describedAs("Number of elements in last partition")
.hasSize(1);

List<List<String>> res2 = Lists.
partition(list, 1);
Assertions.assertThat(res2)
.describedAs("Number of partitions post partition")
.hasSize(5);
Assertions.assertThat(res2.get(0))
.describedAs("Number of elements in first partition")
.hasSize(1);
Assertions.assertThat(res2.get(4))
.describedAs("Number of elements in last partition")
.hasSize(1);

List<List<String>> res3 = Lists.
partition(list, 6);
Assertions.assertThat(res3)
.describedAs("Number of partitions post partition")
.hasSize(1);
Assertions.assertThat(res3.get(0))
.describedAs("Number of elements in first partition")
.hasSize(5);
}

@Test
public void testArrayListWithSize() {
List<String> list = Lists.newArrayListWithCapacity(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -225,6 +226,7 @@
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;

/**
Expand Down Expand Up @@ -550,6 +552,8 @@ public void initialize(URI name, Configuration originalConf)

pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
listing = new Listing(listingOperationCallbacks, createStoreContext());
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
Expand Down Expand Up @@ -2026,14 +2030,12 @@ public CopyResult copyFile(final String srcKey,
}

@Override
public DeleteObjectsResult removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir,
final boolean quiet)
public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException, IOException {
auditSpan.activate();
return S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir,
quiet);
S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir);
}

@Override
Expand Down Expand Up @@ -2818,10 +2820,6 @@ public void incrementPutProgressStatistics(String key, long bytes) {
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys?
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
Expand All @@ -2831,10 +2829,9 @@ public void incrementPutProgressStatistics(String key, long bytes) {
* @throws AmazonClientException other amazon-layer failure.
*/
@Retries.RetryRaw
private DeleteObjectsResult removeKeysS3(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
boolean quiet)
private void removeKeysS3(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
if (LOG.isDebugEnabled()) {
Expand All @@ -2847,16 +2844,28 @@ private DeleteObjectsResult removeKeysS3(
}
if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete
return null;
return;
}
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
blockRootDelete(keyVersion.getKey());
}
DeleteObjectsResult result = null;
try {
if (enableMultiObjectsDelete) {
result = deleteObjects(
getRequestFactory().newBulkDeleteRequest(keysToDelete, quiet));
if (keysToDelete.size() <= pageSize) {
deleteObjects(getRequestFactory()
.newBulkDeleteRequest(keysToDelete));
} else {
// Multi object deletion of more than 1000 keys is not supported
// by s3. So we are paging the keys by page size.
LOG.debug("Partitioning the keys to delete as it is more than " +
"page size. Number of keys: {}, Page size: {}",
keysToDelete.size(), pageSize);
for (List<DeleteObjectsRequest.KeyVersion> batchOfKeysToDelete :
Lists.partition(keysToDelete, pageSize)) {
deleteObjects(getRequestFactory()
.newBulkDeleteRequest(batchOfKeysToDelete));
}
}
} else {
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
deleteObject(keyVersion.getKey());
Expand All @@ -2872,7 +2881,6 @@ private DeleteObjectsResult removeKeysS3(
throw ex;
}
noteDeleted(keysToDelete.size(), deleteFakeDir);
return result;
}

/**
Expand All @@ -2889,7 +2897,7 @@ private void noteDeleted(final int count, final boolean deleteFakeDir) {
}

/**
* Invoke {@link #removeKeysS3(List, boolean, boolean)}.
* Invoke {@link #removeKeysS3(List, boolean)}.
* If a {@code MultiObjectDeleteException} is raised, the
* relevant statistics are updated.
*
Expand All @@ -2910,35 +2918,9 @@ public void removeKeys(
final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
removeKeys(keysToDelete, deleteFakeDir,
true);
}

/**
* Invoke {@link #removeKeysS3(List, boolean, boolean)}.
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
* be deleted in a multiple object delete operation.
* @throws AmazonClientException amazon-layer failure.
* @throws IOException other IO Exception.
*/
@Retries.RetryRaw
private DeleteObjectsResult removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir,
final boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, IOException {
try (DurationInfo ignored = new DurationInfo(LOG, false,
"Deleting %d keys", keysToDelete.size())) {
return removeKeysS3(keysToDelete, deleteFakeDir, quiet);
"Deleting %d keys", keysToDelete.size())) {
removeKeysS3(keysToDelete, deleteFakeDir);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,9 @@ ListObjectsV2Request newListObjectsV2Request(String key,
/**
* Bulk delete request.
* @param keysToDelete list of keys to delete.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys?
* @return the request
*/
DeleteObjectsRequest newBulkDeleteRequest(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean quiet);
List<DeleteObjectsRequest.KeyVersion> keysToDelete);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.stream.Collectors;

import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -365,8 +364,7 @@ private CompletableFuture<Void> submitDelete(
callableWithinAuditSpan(
getAuditSpan(), () -> {
asyncDeleteAction(
keyList,
LOG.isDebugEnabled());
keyList);
return null;
}));
}
Expand All @@ -376,69 +374,42 @@ private CompletableFuture<Void> submitDelete(
* the keys from S3 and paths from S3Guard.
*
* @param keyList keys to delete.
* @param auditDeletedKeys should the results be audited and undeleted
* entries logged?
* @throws IOException failure
*/
@Retries.RetryTranslated
private void asyncDeleteAction(
final List<DeleteEntry> keyList,
final boolean auditDeletedKeys)
final List<DeleteEntry> keyList)
throws IOException {
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored =
new DurationInfo(LOG, false,
"Delete page of %d keys", keyList.size())) {
DeleteObjectsResult result;
if (!keyList.isEmpty()) {
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
.filter(e -> !e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} file objects", files.size());
result = Invoker.once("Remove S3 Files",
Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
files,
false,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
false
));
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir.
result = Invoker.once("Remove S3 Dir Markers",
Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
}
if (auditDeletedKeys) {
// audit the deleted keys
if (deletedObjects.size() != keyList.size()) {
// size mismatch
LOG.warn("Size mismatch in deletion operation. "
+ "Expected count of deleted files: {}; "
+ "actual: {}",
keyList.size(), deletedObjects.size());
// strip out the deleted keys
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
}
for (DeleteEntry kv : keyList) {
LOG.debug("{}", kv.getKey());
}
}
true
));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;

Expand Down Expand Up @@ -138,10 +137,6 @@ CopyResult copyFile(String srcKey,
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
Expand All @@ -150,10 +145,9 @@ CopyResult copyFile(String srcKey,
* @throws IOException other IO Exception.
*/
@Retries.RetryRaw
DeleteObjectsResult removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
boolean quiet)
void removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException;

Expand Down
Loading