-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add Bulk Delete Api to BlobStore #40322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
86e2648
704611c
c8a768a
716c1c5
aa10b32
8f2c20f
d6423d7
93aee4e
e592724
5f7f266
454bdea
e7cb42e
45b9d25
8d280d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; | ||
| import com.amazonaws.services.s3.model.AmazonS3Exception; | ||
| import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; | ||
| import com.amazonaws.services.s3.model.DeleteObjectsRequest; | ||
| import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; | ||
| import com.amazonaws.services.s3.model.ObjectListing; | ||
| import com.amazonaws.services.s3.model.ObjectMetadata; | ||
|
|
@@ -56,6 +57,12 @@ | |
|
|
||
| class S3BlobContainer extends AbstractBlobContainer { | ||
|
|
||
| /** | ||
| * Maximum number of deletes in a {@link DeleteObjectsRequest}. | ||
| * @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>. | ||
| */ | ||
| private static final int MAX_BULK_DELETES = 1000; | ||
|
|
||
| private final S3BlobStore blobStore; | ||
| private final String keyPath; | ||
|
|
||
|
|
@@ -118,6 +125,51 @@ public void deleteBlob(String blobName) throws IOException { | |
| deleteBlobIgnoringIfNotExists(blobName); | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException { | ||
| if (blobNames.isEmpty()) { | ||
| return; | ||
| } | ||
| try (AmazonS3Reference clientReference = blobStore.clientReference()) { | ||
| // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes | ||
| final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>(); | ||
| final List<String> partition = new ArrayList<>(); | ||
| for (String blob : blobNames) { | ||
| partition.add(buildKey(blob)); | ||
| if (partition.size() == MAX_BULK_DELETES ) { | ||
| deleteRequests.add(bulkDelete(blobStore.bucket(), partition)); | ||
| partition.clear(); | ||
| } | ||
| } | ||
| if (partition.isEmpty() == false) { | ||
| deleteRequests.add(bulkDelete(blobStore.bucket(), partition)); | ||
| } | ||
| SocketAccess.doPrivilegedVoid(() -> { | ||
| AmazonClientException aex = null; | ||
| for (DeleteObjectsRequest deleteRequest : deleteRequests) { | ||
| try { | ||
| clientReference.client().deleteObjects(deleteRequest); | ||
| } catch (AmazonClientException e) { | ||
| if (aex == null) { | ||
| aex = e; | ||
| } else { | ||
| aex.addSuppressed(e); | ||
| } | ||
| } | ||
| } | ||
| if (aex != null) { | ||
| throw aex; | ||
| } | ||
| }); | ||
| } catch (final AmazonClientException e) { | ||
| throw new IOException("Exception when deleting blobs [" + blobNames + "]", e); | ||
|
||
| } | ||
| } | ||
|
|
||
| private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) { | ||
| return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true); | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { | ||
| try (AmazonS3Reference clientReference = blobStore.clientReference()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -324,7 +324,7 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke | |
| // Delete Multiple Objects | ||
| // | ||
| // https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html | ||
| handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> { | ||
| final RequestHandler bulkDeleteHandler = request -> { | ||
| final List<String> deletes = new ArrayList<>(); | ||
| final List<String> errors = new ArrayList<>(); | ||
|
|
||
|
|
@@ -344,7 +344,6 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke | |
| if (closingOffset != -1) { | ||
| offset = offset + startMarker.length(); | ||
| final String objectName = requestBody.substring(offset, closingOffset); | ||
|
|
||
| boolean found = false; | ||
| for (Bucket bucket : buckets.values()) { | ||
| if (bucket.objects.containsKey(objectName)) { | ||
|
|
@@ -369,7 +368,9 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke | |
| } | ||
| } | ||
| return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request"); | ||
| }); | ||
| }; | ||
| handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler); | ||
| handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler); | ||
|
||
|
|
||
| // non-authorized requests | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,7 +101,6 @@ | |
| import java.io.FilterInputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.nio.file.DirectoryNotEmptyException; | ||
| import java.nio.file.FileAlreadyExistsException; | ||
| import java.nio.file.NoSuchFileException; | ||
| import java.util.ArrayList; | ||
|
|
@@ -466,22 +465,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { | |
| final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); | ||
| indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); | ||
| final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); | ||
| for (final IndexId indexId : indicesToCleanUp) { | ||
| try { | ||
| indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId()); | ||
| } catch (DirectoryNotEmptyException dnee) { | ||
| // if the directory isn't empty for some reason, it will fail to clean up; | ||
| // we'll ignore that and accept that cleanup didn't fully succeed. | ||
| // since we are using UUIDs for path names, this won't be an issue for | ||
| // snapshotting indices of the same name | ||
| logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + | ||
| "but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee); | ||
| indicesBlobContainer.deleteBlobsIgnoringIfNotExists( | ||
| indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList())); | ||
| } catch (IOException ioe) { | ||
andrershov marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // a different IOException occurred while trying to delete - will just log the issue for now | ||
| logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + | ||
| "but failed to clean up its index folder.", metadata.name(), indexId), ioe); | ||
| logger.warn(() -> | ||
| new ParameterizedMessage( | ||
| "[{}] indices {} are no longer part of any snapshots in the repository, " + | ||
| "but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe); | ||
|
||
| } | ||
| } | ||
| } catch (IOException | ResourceNotFoundException ex) { | ||
| throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex); | ||
| } | ||
|
|
@@ -1018,16 +1011,14 @@ protected void finalize(final List<SnapshotFiles> snapshots, | |
| try { | ||
| // Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier | ||
| // attempt to write an index file with this generation failed mid-way after creating the temporary file. | ||
| for (final String blobName : blobs.keySet()) { | ||
| if (FsBlobContainer.isTempBlobName(blobName)) { | ||
| try { | ||
| blobContainer.deleteBlobIgnoringIfNotExists(blobName); | ||
| } catch (IOException e) { | ||
| logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization", | ||
| snapshotId, shardId, blobName), e); | ||
| throw e; | ||
| } | ||
| } | ||
| final List<String> blobNames = | ||
| blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList()); | ||
| try { | ||
| blobContainer.deleteBlobsIgnoringIfNotExists(blobNames); | ||
| } catch (IOException e) { | ||
| logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization", | ||
| snapshotId, shardId, blobNames), e); | ||
| throw e; | ||
| } | ||
|
|
||
| // If we deleted all snapshots, we don't need to create a new index file | ||
|
|
@@ -1036,28 +1027,26 @@ protected void finalize(final List<SnapshotFiles> snapshots, | |
| } | ||
|
|
||
| // Delete old index files | ||
| for (final String blobName : blobs.keySet()) { | ||
| if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { | ||
| try { | ||
| blobContainer.deleteBlobIgnoringIfNotExists(blobName); | ||
| } catch (IOException e) { | ||
| logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization", | ||
| snapshotId, shardId, blobName), e); | ||
| throw e; | ||
| } | ||
| } | ||
| final List<String> indexBlobs = | ||
| blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList()); | ||
| try { | ||
| blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs); | ||
| } catch (IOException e) { | ||
| logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization", | ||
| snapshotId, shardId, indexBlobs), e); | ||
| throw e; | ||
| } | ||
|
|
||
| // Delete all blobs that don't exist in a snapshot | ||
| for (final String blobName : blobs.keySet()) { | ||
| if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) { | ||
| try { | ||
| blobContainer.deleteBlobIgnoringIfNotExists(blobName); | ||
| } catch (IOException e) { | ||
| logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization", | ||
| snapshotId, shardId, blobName), e); | ||
| } | ||
| } | ||
| final List<String> orphanedBlobs = blobs.keySet().stream() | ||
| .filter(blobName -> | ||
| blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null) | ||
| .collect(Collectors.toList()); | ||
| try { | ||
| blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs); | ||
| } catch (IOException e) { | ||
| logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization", | ||
| snapshotId, shardId, orphanedBlobs), e); | ||
| } | ||
| } catch (IOException e) { | ||
| String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]"; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps link to AWS docs here