diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index d927c0a16448..63e223792276 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -23,23 +23,28 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ -public class ResolvingFileIO implements FileIO, HadoopConfigurable { +public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); + private static final int BATCH_SIZE = 100_000; private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; private static final String GCS_FILE_IO_IMPL = "org.apache.iceberg.gcp.gcs.GCSFileIO"; @@ -85,6 +90,34 @@ public void deleteFile(String location) { io(location).deleteFile(location); } + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE) + .forEachRemaining( + partitioned -> { + Map> pathByFileIO = + partitioned.stream().collect(Collectors.groupingBy(this::io)); + for (Map.Entry> entries : pathByFileIO.entrySet()) { + FileIO io = entries.getKey(); + List filePaths = entries.getValue(); + if (io instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io).deleteFiles(filePaths); + } else { + LOG.warn( + "IO {} does not support bulk operations. Using non-bulk deletes.", + io.getClass().getName()); + Tasks.Builder deleteTasks = + Tasks.foreach(filePaths) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + deleteTasks.run(io::deleteFile); + } + } + }); + } + @Override public Map properties() { return properties.immutableMap(); @@ -129,7 +162,8 @@ public Configuration getConf() { return hadoopConf.get(); } - private FileIO io(String location) { + @VisibleForTesting + FileIO io(String location) { String impl = implFromLocation(location); FileIO io = ioInstances.get(impl); if (io != null) { diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 9ba00d6b3e6c..ff1499bff777 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -19,19 +19,28 @@ package org.apache.iceberg.io; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import java.io.IOException; -import java.nio.file.Path; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.inmemory.InMemoryFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; public class TestResolvingIO { - @TempDir private Path temp; + @TempDir private java.nio.file.Path temp; @Test public void testResolvingFileIOKryoSerialization() throws IOException { @@ -91,4 +100,57 @@ public void testResolvingFileIOWithHadoopFileIOJavaSerialization() assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull(); } + + @Test + public void resolveFileIOBulkDeletion() throws IOException { + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); + Configuration hadoopConf = new Configuration(); + FileSystem fs = FileSystem.getLocal(hadoopConf); + Path parent = new Path(temp.toUri()); + // configure delegation IO + HadoopFileIO delegation = new HadoopFileIO(hadoopConf); + doReturn(delegation).when(resolvingFileIO).io(anyString()); + // write + List randomFilePaths = + IntStream.range(1, 10) + .mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID())) + .collect(Collectors.toList()); + for (Path randomFilePath : randomFilePaths) { + fs.createNewFile(randomFilePath); + assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); + } + // bulk deletion + List randomFilePathString = + randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList()); + resolvingFileIO.deleteFiles(randomFilePathString); + + for (String path : randomFilePathString) { + assertThat(delegation.newInputFile(path).exists()).isFalse(); + } + } + + @Test + public void resolveFileIONonBulkDeletion() { + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); + String parentPath = "inmemory://foo.db/bar"; + // configure delegation IO + InMemoryFileIO delegation = new InMemoryFileIO(); + doReturn(delegation).when(resolvingFileIO).io(anyString()); + // write + byte[] someData = "some data".getBytes(); + List randomFilePaths = + IntStream.range(1, 10) + .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID()) + .collect(Collectors.toList()); + for (String randomFilePath : randomFilePaths) { + delegation.addFile(randomFilePath, someData); + assertThat(delegation.fileExists(randomFilePath)).isTrue(); + } + // non-bulk deletion + resolvingFileIO.deleteFiles(randomFilePaths); + + for (String path : randomFilePaths) { + assertThat(delegation.fileExists(path)).isFalse(); + } + } }