Skip to content
28 changes: 27 additions & 1 deletion core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -34,11 +36,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */
public class ResolvingFileIO implements FileIO, HadoopConfigurable {
public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations {
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
Expand Down Expand Up @@ -83,6 +86,29 @@ public void deleteFile(String location) {
io(location).deleteFile(location);
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
Map<FileIO, List<String>> pathByFileIO =
StreamSupport.stream(pathsToDelete.spliterator(), false)
.collect(Collectors.groupingBy(this::io));
for (Map.Entry<FileIO, List<String>> entries : pathByFileIO.entrySet()) {
FileIO io = entries.getKey();
if (io instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io).deleteFiles(entries.getValue());
} else {
LOG.warn(
"IO {} does not support bulk operations. Using non-bulk deletes.",
io.getClass().getName());
Tasks.Builder<String> deleteTasks =
Tasks.foreach(entries.getValue())
.noRetry()
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
deleteTasks.run(io::deleteFile);
}
}
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
Expand Down
44 changes: 44 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,26 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestResolvingIO {

@TempDir private File tempDir;

@Test
public void testResolvingFileIOKryoSerialization() throws IOException {
FileIO testResolvingFileIO = new ResolvingFileIO();
Expand All @@ -47,4 +60,35 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF
FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO);
assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testResolvingFileIO.properties());
}

@Test
public void resolveFileIOBulkDeletion() throws IOException {
// configure resolving fileIO
FileIO resolvingFileIO = new ResolvingFileIO();
Configuration hadoopConf = new Configuration();
((HadoopConfigurable) resolvingFileIO).setConf(hadoopConf);
resolvingFileIO.initialize(
ImmutableMap.of("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO"));
// configure delegation IO
HadoopFileIO delegate = new HadoopFileIO(hadoopConf);
FileSystem fs = FileSystem.getLocal(hadoopConf);
Path parent = new Path(tempDir.toURI());
// write
List<Path> randomFilePaths =
IntStream.range(1, 10)
.mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID()))
.collect(Collectors.toList());
for (Path randomFilePath : randomFilePaths) {
fs.createNewFile(randomFilePath);
assertThat(delegate.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue();
}
// bulk deletion
List<String> randomFilePathString =
randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList());
((SupportsBulkOperations) resolvingFileIO).deleteFiles(randomFilePathString);

for (String path : randomFilePathString) {
assertThat(delegate.newInputFile(path).exists()).isFalse();
}
}
}