Skip to content

Commit 28bbe56

Browse files
Core: Add deleteFiles to FileIO interface. For S3FileIo implement batch deletion via S3#RemoveObjects API
1 parent 605cdae commit 28bbe56

File tree

7 files changed

+271
-3
lines changed

7 files changed

+271
-3
lines changed

api/src/main/java/org/apache/iceberg/io/FileIO.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ default void deleteFile(OutputFile file) {
6868
default void initialize(Map<String, String> properties) {
6969
}
7070

71+
/**
72+
* Delete the files at the given path.
73+
*/
74+
default void deleteFiles(Iterable<String> pathsToDelete) {
75+
for (String path : pathsToDelete) {
76+
deleteFile(path);
77+
}
78+
}
79+
7180
/**
7281
* Close File IO to release underlying resources.
7382
* <p>

aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.security.MessageDigest;
2727
import java.security.SecureRandom;
2828
import java.util.Base64;
29+
import java.util.List;
2930
import java.util.UUID;
3031
import javax.crypto.KeyGenerator;
3132
import javax.crypto.SecretKey;
@@ -36,6 +37,7 @@
3637
import org.apache.iceberg.aws.AwsProperties;
3738
import org.apache.iceberg.io.InputFile;
3839
import org.apache.iceberg.io.OutputFile;
40+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3941
import org.junit.AfterClass;
4042
import org.junit.Assert;
4143
import org.junit.Before;
@@ -67,6 +69,7 @@ public class TestS3FileIOIntegration {
6769
private static byte[] contentBytes;
6870
private static String content;
6971
private static String kmsKeyArn;
72+
private static int deletionBatchSize;
7073
private String objectKey;
7174
private String objectUri;
7275

@@ -78,6 +81,7 @@ public static void beforeClass() {
7881
bucketName = AwsIntegTestUtil.testBucketName();
7982
prefix = UUID.randomUUID().toString();
8083
contentBytes = new byte[1024 * 1024 * 10];
84+
deletionBatchSize = 3;
8185
content = new String(contentBytes, StandardCharsets.UTF_8);
8286
kmsKeyArn = kms.createKey().keyMetadata().arn();
8387
}
@@ -208,8 +212,49 @@ public void testClientFactorySerialization() throws Exception {
208212
validateRead(fileIO2);
209213
}
210214

215+
@Test
216+
public void testDeleteMultipleBatches() throws Exception {
217+
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties());
218+
testBatchDelete(6, s3FileIO);
219+
}
220+
221+
@Test
222+
public void testDeleteLessThanBatchSize() throws Exception {
223+
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties());
224+
testBatchDelete(2, s3FileIO);
225+
}
226+
227+
@Test
228+
public void testDeleteSingleBatchWithRemainder() throws Exception {
229+
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties());
230+
testBatchDelete(5, s3FileIO);
231+
}
232+
233+
private AwsProperties getDeletionTestProperties() {
234+
AwsProperties properties = new AwsProperties();
235+
properties.setS3FileIoDeleteBatchSize(deletionBatchSize);
236+
return properties;
237+
}
238+
239+
private void testBatchDelete(int numObjects, S3FileIO s3FileIO) throws Exception {
240+
List<String> paths = Lists.newArrayList();
241+
for (int i = 1; i <= numObjects; i++) {
242+
String deletionKey = objectKey + "-deletion-" + i;
243+
write(s3FileIO, String.format("s3://%s/%s", bucketName, deletionKey));
244+
paths.add(String.format("s3://%s/%s", bucketName, deletionKey));
245+
}
246+
s3FileIO.deleteFiles(paths);
247+
for (String path : paths) {
248+
Assert.assertFalse(s3FileIO.newInputFile(path).exists());
249+
}
250+
}
251+
211252
private void write(S3FileIO s3FileIO) throws Exception {
212-
OutputFile outputFile = s3FileIO.newOutputFile(objectUri);
253+
write(s3FileIO, objectUri);
254+
}
255+
256+
private void write(S3FileIO s3FileIO, String uri) throws Exception {
257+
OutputFile outputFile = s3FileIO.newOutputFile(uri);
213258
OutputStream outputStream = outputFile.create();
214259
IoUtils.copy(new ByteArrayInputStream(contentBytes), outputStream);
215260
outputStream.close();

aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ public class AwsProperties implements Serializable {
171171
public static final String S3_CHECKSUM_ENABLED = "s3.checksum-enabled";
172172
public static final boolean S3_CHECKSUM_ENABLED_DEFAULT = false;
173173

174+
/**
175+
* Configure the batch size used when deleting multiple files from a given S3 bucket
176+
*/
177+
public static final String S3FILEIO_DELETE_BATCH_SIZE = "s3.delete.batch-size";
178+
174179
/**
175180
* DynamoDB table name for {@link DynamoDbCatalog}
176181
*/
@@ -221,11 +226,17 @@ public class AwsProperties implements Serializable {
221226
@Deprecated
222227
public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;
223228

229+
/**
230+
* Sets the size for batch deletion.
231+
*/
232+
public static final int S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250;
233+
224234
private String s3FileIoSseType;
225235
private String s3FileIoSseKey;
226236
private String s3FileIoSseMd5;
227237
private int s3FileIoMultipartUploadThreads;
228238
private int s3FileIoMultiPartSize;
239+
private int s3FileIoDeleteBatchSize;
229240
private double s3FileIoMultipartThresholdFactor;
230241
private String s3fileIoStagingDirectory;
231242
private ObjectCannedACL s3FileIoAcl;
@@ -246,6 +257,7 @@ public AwsProperties() {
246257
this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors();
247258
this.s3FileIoMultiPartSize = S3FILEIO_MULTIPART_SIZE_DEFAULT;
248259
this.s3FileIoMultipartThresholdFactor = S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT;
260+
this.s3FileIoDeleteBatchSize = S3FILEIO_DELETE_BATCH_SIZE_DEFAULT;
249261
this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");
250262
this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT;
251263

@@ -300,6 +312,11 @@ public AwsProperties(Map<String, String> properties) {
300312
this.isS3ChecksumEnabled = PropertyUtil.propertyAsBoolean(properties, S3_CHECKSUM_ENABLED,
301313
S3_CHECKSUM_ENABLED_DEFAULT);
302314

315+
this.s3FileIoDeleteBatchSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_DELETE_BATCH_SIZE,
316+
S3FILEIO_DELETE_BATCH_SIZE_DEFAULT);
317+
Preconditions.checkArgument(s3FileIoDeleteBatchSize > 0 && s3FileIoDeleteBatchSize < 1000,
318+
"Deletion batch size must be between 1 and 1000");
319+
303320
this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
304321
DYNAMODB_TABLE_NAME_DEFAULT);
305322
}
@@ -316,6 +333,14 @@ public String s3FileIoSseKey() {
316333
return s3FileIoSseKey;
317334
}
318335

336+
public int s3FileIoDeleteBatchSize() {
337+
return s3FileIoDeleteBatchSize;
338+
}
339+
340+
public void setS3FileIoDeleteBatchSize(int deleteBatchSize) {
341+
this.s3FileIoDeleteBatchSize = deleteBatchSize;
342+
}
343+
319344
public void setS3FileIoSseKey(String sseKey) {
320345
this.s3FileIoSseKey = sseKey;
321346
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.aws.s3;
21+
22+
public class S3BatchDeletionException extends RuntimeException {
23+
public S3BatchDeletionException(String message) {
24+
super(message);
25+
}
26+
}

aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,31 @@
1919

2020
package org.apache.iceberg.aws.s3;
2121

22+
import java.util.Collection;
23+
import java.util.List;
2224
import java.util.Map;
25+
import java.util.Set;
2326
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.stream.Collectors;
2428
import org.apache.iceberg.aws.AwsClientFactories;
2529
import org.apache.iceberg.aws.AwsClientFactory;
2630
import org.apache.iceberg.aws.AwsProperties;
2731
import org.apache.iceberg.io.FileIO;
2832
import org.apache.iceberg.io.InputFile;
2933
import org.apache.iceberg.io.OutputFile;
34+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
35+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
36+
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
37+
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
38+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3039
import org.apache.iceberg.util.SerializableSupplier;
3140
import software.amazon.awssdk.services.s3.S3Client;
41+
import software.amazon.awssdk.services.s3.model.Delete;
3242
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
43+
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
44+
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
45+
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
46+
import software.amazon.awssdk.services.s3.model.S3Error;
3347

3448
/**
3549
* FileIO implementation backed by S3.
@@ -94,6 +108,66 @@ public void deleteFile(String path) {
94108
client().deleteObject(deleteRequest);
95109
}
96110

111+
/**
112+
* Deletes the given paths in a batched manner.
113+
* <p>
114+
* The paths are grouped by bucket, and deletion is triggered when we either reach the configured batch size
115+
* or have a final remainder batch for each bucket.
116+
*
117+
* @param paths paths to delete
118+
*/
119+
@Override
120+
public void deleteFiles(Iterable<String> paths) {
121+
SetMultimap<String, String> bucketToObjects = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
122+
List<String> failedDeletions = Lists.newArrayList();
123+
for (String path : paths) {
124+
S3URI location = new S3URI(path);
125+
String bucket = location.bucket();
126+
String objectKey = location.key();
127+
Set<String> objectsInBucket = bucketToObjects.get(bucket);
128+
if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
129+
List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, objectsInBucket);
130+
failedDeletions.addAll(failedDeletionsForBatch);
131+
bucketToObjects.removeAll(bucket);
132+
}
133+
bucketToObjects.get(bucket).add(objectKey);
134+
}
135+
// Delete the remainder
136+
List<List<String>> remainderFailedObjects = bucketToObjects
137+
.asMap()
138+
.entrySet()
139+
.stream()
140+
.map(entry -> deleteObjectsInBucket(entry.getKey(), entry.getValue()))
141+
.collect(Collectors.toList());
142+
143+
remainderFailedObjects.forEach(failedDeletions::addAll);
144+
if (!failedDeletions.isEmpty()) {
145+
throw new S3BatchDeletionException(String.format("Failed to delete %d objects. Failed objects: %s",
146+
failedDeletions.size(), failedDeletions));
147+
}
148+
}
149+
150+
private List<String> deleteObjectsInBucket(String bucket, Collection<String> objects) {
151+
if (!objects.isEmpty()) {
152+
List<ObjectIdentifier> objectIds = objects
153+
.stream()
154+
.map(objectKey -> ObjectIdentifier.builder().key(objectKey).build())
155+
.collect(Collectors.toList());
156+
DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
157+
.bucket(bucket)
158+
.delete(Delete.builder().objects(objectIds).build())
159+
.build();
160+
DeleteObjectsResponse response = client().deleteObjects(deleteObjectsRequest);
161+
if (response.hasErrors()) {
162+
return response.errors()
163+
.stream()
164+
.map(S3Error::key)
165+
.collect(Collectors.toList());
166+
}
167+
}
168+
return Lists.newArrayList();
169+
}
170+
97171
private S3Client client() {
98172
if (client == null) {
99173
client = s3.get();

aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,24 @@ public void testS3MultipartThresholdFactorLessThanOne() {
9797
() -> new AwsProperties(map));
9898
}
9999

100+
@Test
101+
public void testS3FileIoDeleteBatchSizeTooLarge() {
102+
Map<String, String> map = Maps.newHashMap();
103+
map.put(AwsProperties.S3FILEIO_DELETE_BATCH_SIZE, "2000");
104+
AssertHelpers.assertThrows("should not accept batch size greater than 1000",
105+
IllegalArgumentException.class,
106+
"Deletion batch size must be between 1 and 1000",
107+
() -> new AwsProperties(map));
108+
}
109+
110+
@Test
111+
public void testS3FileIoDeleteBatchSizeTooSmall() {
112+
Map<String, String> map = Maps.newHashMap();
113+
map.put(AwsProperties.S3FILEIO_DELETE_BATCH_SIZE, "0");
114+
AssertHelpers.assertThrows("should not accept batch size less than 1",
115+
IllegalArgumentException.class,
116+
"Deletion batch size must be between 1 and 1000",
117+
() -> new AwsProperties(map));
118+
}
119+
100120
}

0 commit comments

Comments
 (0)