diff --git a/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java b/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java new file mode 100644 index 000000000000..af9513785213 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +public class BulkDeletionFailureException extends RuntimeException { + private final int numberFailedObjects; + + public BulkDeletionFailureException(int numberFailedObjects) { + super(String.format("Failed to delete %d files", numberFailedObjects)); + this.numberFailedObjects = numberFailedObjects; + } + + public int numberFailedObjects() { + return numberFailedObjects; + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/SupportsBulkOperations.java b/api/src/main/java/org/apache/iceberg/io/SupportsBulkOperations.java new file mode 100644 index 000000000000..4599029baea8 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/SupportsBulkOperations.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +public interface SupportsBulkOperations { + /** + * Delete the files at the given paths. + * + * @param pathsToDelete The paths to delete + * @throws BulkDeletionFailureException in + */ + void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException; +} diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index ee2c27ea0b95..fbce29e05056 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -26,6 +26,7 @@ import java.security.MessageDigest; import java.security.SecureRandom; import java.util.Base64; +import java.util.List; import java.util.UUID; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; @@ -36,6 +37,7 @@ import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -67,6 +69,7 @@ public class TestS3FileIOIntegration { private static byte[] contentBytes; private static String content; private static String kmsKeyArn; + private static int deletionBatchSize; private String objectKey; private String objectUri; @@ -78,6 +81,7 @@ public static void beforeClass() { bucketName = AwsIntegTestUtil.testBucketName(); prefix = UUID.randomUUID().toString(); contentBytes = new byte[1024 * 1024 * 10]; + deletionBatchSize = 3; content = new String(contentBytes, StandardCharsets.UTF_8); kmsKeyArn = kms.createKey().keyMetadata().arn(); } @@ -208,8 +212,49 @@ public void testClientFactorySerialization() throws Exception { validateRead(fileIO2); } + @Test + public void testDeleteFilesMultipleBatches() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties()); + testDeleteFiles(deletionBatchSize * 2, s3FileIO); + } + + @Test + public void testDeleteFilesLessThanBatchSize() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties()); + testDeleteFiles(deletionBatchSize - 1, s3FileIO); + } + + @Test + public void testDeleteFilesSingleBatchWithRemainder() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties()); + testDeleteFiles(5, s3FileIO); + } + + private AwsProperties getDeletionTestProperties() { + AwsProperties properties = new AwsProperties(); + properties.setS3FileIoDeleteBatchSize(deletionBatchSize); + return properties; + } + + private void testDeleteFiles(int numObjects, S3FileIO s3FileIO) throws Exception { + List paths = Lists.newArrayList(); + for (int i = 1; i <= numObjects; i++) { + String deletionKey = objectKey + "-deletion-" + i; + write(s3FileIO, String.format("s3://%s/%s", bucketName, deletionKey)); + paths.add(String.format("s3://%s/%s", bucketName, deletionKey)); + } + s3FileIO.deleteFiles(paths); + for (String path : paths) { + Assert.assertFalse(s3FileIO.newInputFile(path).exists()); + } + } + private void write(S3FileIO s3FileIO) throws Exception { - OutputFile outputFile = s3FileIO.newOutputFile(objectUri); + write(s3FileIO, objectUri); + } + + private void write(S3FileIO s3FileIO, String uri) throws Exception { + OutputFile outputFile = s3FileIO.newOutputFile(uri); OutputStream outputStream = outputFile.create(); IoUtils.copy(new ByteArrayInputStream(contentBytes), outputStream); outputStream.close(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index e023f7c3d331..dac18d34ac09 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -172,6 +172,25 @@ public class AwsProperties implements Serializable { public static final String S3_CHECKSUM_ENABLED = "s3.checksum-enabled"; public static final boolean S3_CHECKSUM_ENABLED_DEFAULT = false; + /** + * Configure the batch size used when deleting multiple files from a given S3 bucket + */ + public static final String S3FILEIO_DELETE_BATCH_SIZE = "s3.delete.batch-size"; + + /** + * Default batch size used when deleting files. + *

+ * Refer to https://github.com/apache/hadoop/commit/56dee667707926f3796c7757be1a133a362f05c9 + * for more details on why this value was chosen. + */ + public static final int S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250; + + /** + * Max possible batch size for deletion. Currently, a max of 1000 keys can be deleted in one batch. + * https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html + */ + public static final int S3FILEIO_DELETE_BATCH_SIZE_MAX = 1000; + /** * DynamoDB table name for {@link DynamoDbCatalog} */ @@ -236,6 +255,7 @@ public class AwsProperties implements Serializable { private String s3FileIoSseMd5; private int s3FileIoMultipartUploadThreads; private int s3FileIoMultiPartSize; + private int s3FileIoDeleteBatchSize; private double s3FileIoMultipartThresholdFactor; private String s3fileIoStagingDirectory; private ObjectCannedACL s3FileIoAcl; @@ -256,6 +276,7 @@ public AwsProperties() { this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors(); this.s3FileIoMultiPartSize = S3FILEIO_MULTIPART_SIZE_DEFAULT; this.s3FileIoMultipartThresholdFactor = S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT; + this.s3FileIoDeleteBatchSize = S3FILEIO_DELETE_BATCH_SIZE_DEFAULT; this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir"); this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT; @@ -310,6 +331,12 @@ public AwsProperties(Map properties) { this.isS3ChecksumEnabled = PropertyUtil.propertyAsBoolean(properties, S3_CHECKSUM_ENABLED, S3_CHECKSUM_ENABLED_DEFAULT); + this.s3FileIoDeleteBatchSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_DELETE_BATCH_SIZE, + S3FILEIO_DELETE_BATCH_SIZE_DEFAULT); + Preconditions.checkArgument(s3FileIoDeleteBatchSize > 0 && + s3FileIoDeleteBatchSize <= S3FILEIO_DELETE_BATCH_SIZE_MAX, + String.format("Deletion batch size must be between 1 and %s", S3FILEIO_DELETE_BATCH_SIZE_MAX)); + this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT); } @@ -326,6 +353,14 @@ public String s3FileIoSseKey() { return s3FileIoSseKey; } + public int s3FileIoDeleteBatchSize() { + return s3FileIoDeleteBatchSize; + } + + public void setS3FileIoDeleteBatchSize(int deleteBatchSize) { + this.s3FileIoDeleteBatchSize = deleteBatchSize; + } + public void setS3FileIoSseKey(String sseKey) { this.s3FileIoSseKey = sseKey; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 7f908fa26e32..c8343486f003 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -19,6 +19,8 @@ package org.apache.iceberg.aws.s3; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,16 +28,27 @@ import org.apache.iceberg.aws.AwsClientFactories; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.Tag; /** @@ -45,7 +58,7 @@ * URIs with schemes s3a, s3n, https are also treated as s3 file paths. * Using this FileIO with other schemes will result in {@link org.apache.iceberg.exceptions.ValidationException}. */ -public class S3FileIO implements FileIO { +public class S3FileIO implements FileIO, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; @@ -105,6 +118,68 @@ public void deleteFile(String path) { client().deleteObject(deleteRequest); } + /** + * Deletes the given paths in a batched manner. + *

+ * The paths are grouped by bucket, and deletion is triggered when we either reach the configured batch size + * or have a final remainder batch for each bucket. + * + * @param paths paths to delete + */ + @Override + public void deleteFiles(Iterable paths) throws BulkDeletionFailureException { + SetMultimap bucketToObjects = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet); + int numberOfFailedDeletions = 0; + for (String path : paths) { + S3URI location = new S3URI(path); + String bucket = location.bucket(); + String objectKey = location.key(); + Set objectsInBucket = bucketToObjects.get(bucket); + if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) { + List failedDeletionsForBatch = deleteObjectsInBucket(bucket, objectsInBucket); + numberOfFailedDeletions += failedDeletionsForBatch.size(); + failedDeletionsForBatch.forEach(failedPath -> LOG.warn("Failed to delete object at path {}", failedPath)); + bucketToObjects.removeAll(bucket); + } + bucketToObjects.get(bucket).add(objectKey); + } + + // Delete the remainder + for (Map.Entry> bucketToObjectsEntry : bucketToObjects.asMap().entrySet()) { + final String bucket = bucketToObjectsEntry.getKey(); + final Collection objects = bucketToObjectsEntry.getValue(); + List failedDeletions = deleteObjectsInBucket(bucket, objects); + failedDeletions.forEach(failedPath -> LOG.warn("Failed to delete object at path {}", failedPath)); + numberOfFailedDeletions += failedDeletions.size(); + } + + if (numberOfFailedDeletions > 0) { + throw new BulkDeletionFailureException(numberOfFailedDeletions); + } + } + + private List deleteObjectsInBucket(String bucket, Collection objects) { + if (!objects.isEmpty()) { + List objectIds = objects + .stream() + .map(objectKey -> ObjectIdentifier.builder().key(objectKey).build()) + .collect(Collectors.toList()); + DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder() + .bucket(bucket) + .delete(Delete.builder().objects(objectIds).build()) + .build(); + DeleteObjectsResponse response = client().deleteObjects(deleteObjectsRequest); + if (response.hasErrors()) { + return response.errors() + .stream() + .map(error -> String.format("s3://%s/%s", bucket, error.key())) + .collect(Collectors.toList()); + } + } + + return Lists.newArrayList(); + } + private S3Client client() { if (client == null) { client = s3.get(); diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java index 9e1d0bde1169..1d849370330e 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java @@ -97,4 +97,24 @@ public void testS3MultipartThresholdFactorLessThanOne() { () -> new AwsProperties(map)); } + @Test + public void testS3FileIoDeleteBatchSizeTooLarge() { + Map map = Maps.newHashMap(); + map.put(AwsProperties.S3FILEIO_DELETE_BATCH_SIZE, "2000"); + AssertHelpers.assertThrows("should not accept batch size greater than 1000", + IllegalArgumentException.class, + "Deletion batch size must be between 1 and 1000", + () -> new AwsProperties(map)); + } + + @Test + public void testS3FileIoDeleteBatchSizeTooSmall() { + Map map = Maps.newHashMap(); + map.put(AwsProperties.S3FILEIO_DELETE_BATCH_SIZE, "0"); + AssertHelpers.assertThrows("should not accept batch size less than 1", + IllegalArgumentException.class, + "Deletion batch size must be between 1 and 1000", + () -> new AwsProperties(map)); + } + } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index a08e3d9e7fda..748ca0523e30 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -23,44 +23,69 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import java.util.Map; import java.util.Random; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.SerializationUtils; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.SerializableSupplier; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Error; import software.amazon.awssdk.services.s3.model.Tag; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) public class TestS3FileIO { @ClassRule public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build(); public SerializableSupplier s3 = S3_MOCK_RULE::createS3ClientV2; + private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3.get())); private final Random random = new Random(1); - + private final int numBucketsForBatchDeletion = 3; + private final String batchDeletionBucketPrefix = "batch-delete-"; + private final int batchDeletionSize = 5; private S3FileIO s3FileIO; private final Map properties = ImmutableMap.of( - "s3.write.tags.tagKey1", "TagValue1"); + "s3.write.tags.tagKey1", "TagValue1", + "s3.delete.batch-size", Integer.toString(batchDeletionSize)); @Before public void before() { - s3FileIO = new S3FileIO(s3); + s3FileIO = new S3FileIO(() -> s3mock); s3FileIO.initialize(properties); s3.get().createBucket(CreateBucketRequest.builder().bucket("bucket").build()); + for (int i = 1; i <= numBucketsForBatchDeletion; i++) { + s3.get().createBucket(CreateBucketRequest.builder().bucket(batchDeletionBucketPrefix + i).build()); + } } @Test @@ -91,6 +116,72 @@ public void testNewInputFile() throws IOException { assertFalse(s3FileIO.newInputFile(location).exists()); } + @Test + public void testDeleteFilesMultipleBatches() { + testBatchDelete(batchDeletionSize * 2); + } + + @Test + public void testDeleteFilesLessThanBatchSize() { + testBatchDelete(batchDeletionSize - 1); + } + + @Test + public void testDeleteFilesSingleBatchWithRemainder() { + testBatchDelete(batchDeletionSize + 1); + } + + @Test + public void testDeleteEmptyList() throws IOException { + String location = "s3://bucket/path/to/file.txt"; + InputFile in = s3FileIO.newInputFile(location); + assertFalse(in.exists()); + OutputFile out = s3FileIO.newOutputFile(location); + try (OutputStream os = out.createOrOverwrite()) { + IOUtils.write(new byte[1024 * 1024], os); + } + + s3FileIO.deleteFiles(Lists.newArrayList()); + + Assert.assertTrue(s3FileIO.newInputFile(location).exists()); + s3FileIO.deleteFile(in); + assertFalse(s3FileIO.newInputFile(location).exists()); + } + + @Test + public void testDeleteFilesS3ReturnsError() { + String location = "s3://bucket/path/to/file-to-delete.txt"; + DeleteObjectsResponse deleteObjectsResponse = DeleteObjectsResponse.builder() + .errors(ImmutableList.of(S3Error.builder().key("path/to/file.txt").build())) + .build(); + doReturn(deleteObjectsResponse).when(s3mock).deleteObjects((DeleteObjectsRequest) any()); + + AssertHelpers.assertThrows("A failure during S3 DeleteObjects call should result in FileIODeleteException", + BulkDeletionFailureException.class, + "Failed to delete 1 file", + () -> s3FileIO.deleteFiles(Lists.newArrayList(location))); + } + + private void testBatchDelete(int numObjects) { + List paths = Lists.newArrayList(); + for (int i = 1; i <= numBucketsForBatchDeletion; i++) { + String bucketName = batchDeletionBucketPrefix + i; + for (int j = 1; j <= numObjects; j++) { + String key = "object-" + j; + paths.add("s3://" + bucketName + "/" + key); + } + } + s3FileIO.deleteFiles(paths); + + int expectedNumberOfBatchesPerBucket = + (numObjects / batchDeletionSize) + (numObjects % batchDeletionSize == 0 ? 0 : 1); + int expectedDeleteRequests = expectedNumberOfBatchesPerBucket * numBucketsForBatchDeletion; + verify(s3mock, times(expectedDeleteRequests)).deleteObjects((DeleteObjectsRequest) any()); + for (String path : paths) { + Assert.assertFalse(s3FileIO.newInputFile(path).exists()); + } + } + @Test public void testSerializeClient() { SerializableSupplier pre = @@ -120,7 +211,7 @@ public void testWriteTags() throws IOException { // Assert for writeTags assertTrue(((S3InputFile) in).writeTags().isEmpty()); - assertEquals(((S3OutputFile) out).writeTags().size(), properties.size()); + assertEquals(((S3OutputFile) out).writeTags().size(), 1); assertEquals(((S3OutputFile) out).writeTags(), ImmutableSet.of( Tag.builder().key("tagKey1").value("TagValue1").build()));