-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Perform S3 directory deletion with batch requests #13974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9208595
3a9708c
9cabfc0
ba2d0b6
52e69aa
62a4412
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -124,6 +124,7 @@ | |
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.OptionalInt; | ||
| import java.util.Set; | ||
|
|
@@ -142,6 +143,7 @@ | |
| import static com.amazonaws.services.s3.model.StorageClass.Glacier; | ||
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static com.google.common.base.Preconditions.checkPositionIndexes; | ||
| import static com.google.common.base.Preconditions.checkState; | ||
| import static com.google.common.base.Strings.isNullOrEmpty; | ||
| import static com.google.common.base.Strings.nullToEmpty; | ||
| import static com.google.common.base.Throwables.throwIfInstanceOf; | ||
|
|
@@ -606,25 +608,81 @@ public boolean rename(Path src, Path dst) | |
| public boolean delete(Path path, boolean recursive) | ||
| throws IOException | ||
| { | ||
| try { | ||
| if (!directory(path)) { | ||
| return deleteObject(keyFromPath(path)); | ||
| String key = keyFromPath(path); | ||
| if (recursive) { | ||
| DeletePrefixResult deletePrefixResult; | ||
| try { | ||
| deletePrefixResult = deletePrefix(path); | ||
| } | ||
| catch (AmazonClientException e) { | ||
| throw new IOException("Failed to delete paths with the prefix path " + path, e); | ||
| } | ||
| if (deletePrefixResult == DeletePrefixResult.NO_KEYS_FOUND) { | ||
| // If the provided key is not a "directory" prefix, attempt to delete the object with the specified key | ||
| deleteObject(key); | ||
|
findinpath marked this conversation as resolved.
Outdated
|
||
| } | ||
| else if (deletePrefixResult == DeletePrefixResult.DELETE_KEYS_FAILURE) { | ||
| return false; | ||
| } | ||
| deleteObject(key + DIRECTORY_SUFFIX); | ||
| } | ||
| catch (FileNotFoundException e) { | ||
| return false; | ||
| else { | ||
| Iterator<ListObjectsV2Result> listingsIterator = listObjects(path, OptionalInt.of(2), true); | ||
| Iterator<String> objectKeysIterator = Iterators.concat(Iterators.transform(listingsIterator, TrinoS3FileSystem::keysFromRecursiveListing)); | ||
| if (objectKeysIterator.hasNext()) { | ||
| String childKey = objectKeysIterator.next(); | ||
| if (!Objects.equals(childKey, key + PATH_SEPARATOR) || objectKeysIterator.hasNext()) { | ||
| throw new IOException("Directory " + path + " is not empty"); | ||
| } | ||
| deleteObject(childKey); | ||
| } | ||
| else { | ||
| // Avoid deleting the bucket in case that the provided path points to the bucket root | ||
|
findepi marked this conversation as resolved.
|
||
| if (!key.isEmpty()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this check to an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moving the check up would cause the method to return |
||
| deleteObject(key); | ||
|
findepi marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
| deleteObject(key + DIRECTORY_SUFFIX); | ||
|
findepi marked this conversation as resolved.
Outdated
|
||
| } | ||
| return true; | ||
| } | ||
|
|
||
| if (!recursive) { | ||
| throw new IOException("Directory " + path + " is not empty"); | ||
| private DeletePrefixResult deletePrefix(Path prefix) | ||
| { | ||
| String bucketName = getBucketName(uri); | ||
| Iterator<ListObjectsV2Result> listings = listObjects(prefix, OptionalInt.empty(), true); | ||
| Iterator<String> objectKeys = Iterators.concat(Iterators.transform(listings, TrinoS3FileSystem::keysFromRecursiveListing)); | ||
| Iterator<List<String>> objectKeysBatches = Iterators.partition(objectKeys, DELETE_BATCH_SIZE); | ||
| if (!objectKeysBatches.hasNext()) { | ||
| return DeletePrefixResult.NO_KEYS_FOUND; | ||
| } | ||
|
|
||
| for (FileStatus file : listStatus(path)) { | ||
| delete(file.getPath(), true); | ||
| boolean allKeysDeleted = true; | ||
| while (objectKeysBatches.hasNext()) { | ||
| String[] objectKeysBatch = objectKeysBatches.next().toArray(String[]::new); | ||
| try { | ||
| s3.deleteObjects(new DeleteObjectsRequest(bucketName) | ||
| .withKeys(objectKeysBatch) | ||
| .withRequesterPays(requesterPaysEnabled) | ||
| .withQuiet(true)); | ||
| } | ||
| catch (AmazonS3Exception e) { | ||
| log.debug(e, "Failed to delete objects from the bucket %s under the prefix '%s'", bucketName, prefix); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| allKeysDeleted = false; | ||
| } | ||
| } | ||
| deleteObject(keyFromPath(path) + DIRECTORY_SUFFIX); | ||
|
|
||
| return true; | ||
| return allKeysDeleted ? DeletePrefixResult.ALL_KEYS_DELETED : DeletePrefixResult.DELETE_KEYS_FAILURE; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Iterator<String> keysFromRecursiveListing(ListObjectsV2Result listing) | ||
| { | ||
| checkState( | ||
| listing.getCommonPrefixes() == null || listing.getCommonPrefixes().isEmpty(), | ||
| "No common prefixes should be present when listing without a path delimiter"); | ||
|
|
||
| return Iterators.transform(listing.getObjectSummaries().iterator(), S3ObjectSummary::getKey); | ||
| } | ||
|
|
||
| private boolean directory(Path path) | ||
|
|
@@ -701,12 +759,29 @@ public boolean isFilesOnly() | |
| { | ||
| return (this == SHALLOW_FILES_ONLY || this == RECURSIVE_FILES_ONLY); | ||
| } | ||
|
|
||
| public boolean isRecursive() | ||
| { | ||
| return this == RECURSIVE_FILES_ONLY; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * List all objects rooted at the provided path. | ||
| */ | ||
| private Iterator<LocatedFileStatus> listPath(Path path, OptionalInt initialMaxKeys, ListingMode mode) | ||
| { | ||
| Iterator<ListObjectsV2Result> listings = listObjects(path, initialMaxKeys, mode.isRecursive()); | ||
|
|
||
| Iterator<LocatedFileStatus> results = Iterators.concat(Iterators.transform(listings, this::statusFromListing)); | ||
| if (mode.isFilesOnly()) { | ||
| // Even recursive listing can still contain empty "directory" objects, must filter them out | ||
| results = Iterators.filter(results, LocatedFileStatus::isFile); | ||
| } | ||
| return results; | ||
| } | ||
|
|
||
| private Iterator<ListObjectsV2Result> listObjects(Path path, OptionalInt initialMaxKeys, boolean recursive) | ||
|
findinpath marked this conversation as resolved.
Outdated
|
||
| { | ||
| String key = keyFromPath(path); | ||
| if (!key.isEmpty()) { | ||
|
|
@@ -716,12 +791,12 @@ private Iterator<LocatedFileStatus> listPath(Path path, OptionalInt initialMaxKe | |
| ListObjectsV2Request request = new ListObjectsV2Request() | ||
| .withBucketName(getBucketName(uri)) | ||
| .withPrefix(key) | ||
| .withDelimiter(mode == ListingMode.RECURSIVE_FILES_ONLY ? null : PATH_SEPARATOR) | ||
| .withDelimiter(recursive ? null : PATH_SEPARATOR) | ||
| .withMaxKeys(initialMaxKeys.isPresent() ? initialMaxKeys.getAsInt() : null) | ||
| .withRequesterPays(requesterPaysEnabled); | ||
|
|
||
| STATS.newListObjectsCall(); | ||
| Iterator<ListObjectsV2Result> listings = new AbstractSequentialIterator<>(s3.listObjectsV2(request)) | ||
| return new AbstractSequentialIterator<>(s3.listObjectsV2(request)) | ||
| { | ||
| @Override | ||
| protected ListObjectsV2Result computeNext(ListObjectsV2Result previous) | ||
|
|
@@ -734,13 +809,6 @@ protected ListObjectsV2Result computeNext(ListObjectsV2Result previous) | |
| return s3.listObjectsV2(request); | ||
| } | ||
| }; | ||
|
|
||
| Iterator<LocatedFileStatus> results = Iterators.concat(Iterators.transform(listings, this::statusFromListing)); | ||
| if (mode.isFilesOnly()) { | ||
| // Even recursive listing can still contain empty "directory" objects, must filter them out | ||
| results = Iterators.filter(results, LocatedFileStatus::isFile); | ||
| } | ||
| return results; | ||
| } | ||
|
|
||
| private Iterator<LocatedFileStatus> statusFromListing(ListObjectsV2Result listing) | ||
|
|
@@ -1896,4 +1964,11 @@ private static String getMd5AsBase64(byte[] data, int offset, int length) | |
| byte[] md5 = md5().hashBytes(data, offset, length).asBytes(); | ||
| return Base64.getEncoder().encodeToString(md5); | ||
| } | ||
|
|
||
| private enum DeletePrefixResult | ||
| { | ||
| NO_KEYS_FOUND, | ||
| ALL_KEYS_DELETED, | ||
| DELETE_KEYS_FAILURE | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.