Skip to content
38 changes: 36 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,28 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */
public class ResolvingFileIO implements FileIO, HadoopConfigurable {
public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations {
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
private static final int BATCH_SIZE = 100_000;
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
private static final String GCS_FILE_IO_IMPL = "org.apache.iceberg.gcp.gcs.GCSFileIO";
Expand Down Expand Up @@ -85,6 +90,34 @@ public void deleteFile(String location) {
io(location).deleteFile(location);
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE)
.forEachRemaining(
partitioned -> {
Map<FileIO, List<String>> pathByFileIO =
partitioned.stream().collect(Collectors.groupingBy(this::io));
for (Map.Entry<FileIO, List<String>> entries : pathByFileIO.entrySet()) {
FileIO io = entries.getKey();
List<String> filePaths = entries.getValue();
if (io instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io).deleteFiles(filePaths);
} else {
LOG.warn(
"IO {} does not support bulk operations. Using non-bulk deletes.",
io.getClass().getName());
Tasks.Builder<String> deleteTasks =
Tasks.foreach(filePaths)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
deleteTasks.run(io::deleteFile);
}
}
});
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
Expand Down Expand Up @@ -129,7 +162,8 @@ public Configuration getConf() {
return hadoopConf.get();
}

private FileIO io(String location) {
@VisibleForTesting
FileIO io(String location) {
String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
if (io != null) {
Expand Down
66 changes: 64 additions & 2 deletions core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestResolvingIO {

@TempDir private Path temp;
@TempDir private java.nio.file.Path temp;

@Test
public void testResolvingFileIOKryoSerialization() throws IOException {
Expand Down Expand Up @@ -91,4 +100,57 @@ public void testResolvingFileIOWithHadoopFileIOJavaSerialization()
assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class);
assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull();
}

@Test
public void resolveFileIOBulkDeletion() throws IOException {
ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.getLocal(hadoopConf);
Path parent = new Path(temp.toUri());
// configure delegation IO
HadoopFileIO delegation = new HadoopFileIO(hadoopConf);
doReturn(delegation).when(resolvingFileIO).io(anyString());
// write
List<Path> randomFilePaths =
IntStream.range(1, 10)
.mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID()))
.collect(Collectors.toList());
for (Path randomFilePath : randomFilePaths) {
fs.createNewFile(randomFilePath);
assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue();
}
// bulk deletion
List<String> randomFilePathString =
randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList());
resolvingFileIO.deleteFiles(randomFilePathString);

for (String path : randomFilePathString) {
assertThat(delegation.newInputFile(path).exists()).isFalse();
}
}

@Test
public void resolveFileIONonBulkDeletion() {
ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
String parentPath = "inmemory://foo.db/bar";
// configure delegation IO
InMemoryFileIO delegation = new InMemoryFileIO();
doReturn(delegation).when(resolvingFileIO).io(anyString());
// write
byte[] someData = "some data".getBytes();
List<String> randomFilePaths =
IntStream.range(1, 10)
.mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID())
.collect(Collectors.toList());
for (String randomFilePath : randomFilePaths) {
delegation.addFile(randomFilePath, someData);
assertThat(delegation.fileExists(randomFilePath)).isTrue();
}
// non-bulk deletion
resolvingFileIO.deleteFiles(randomFilePaths);

for (String path : randomFilePaths) {
assertThat(delegation.fileExists(path)).isFalse();
}
}
}