From 0166991b19c71c5f4c61fe24874b2fbbd7b8bd6f Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Mon, 3 Jul 2023 16:37:34 -0700 Subject: [PATCH 1/8] Core: Extend ResolvingFileIO to support BulkOperations --- .../apache/iceberg/io/ResolvingFileIO.java | 28 +++++++++++- .../apache/iceberg/io/TestResolvingIO.java | 44 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index f7369224ee29..61b68398a752 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.exceptions.ValidationException; @@ -34,11 +36,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ -public class ResolvingFileIO implements FileIO, HadoopConfigurable { +public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; @@ -83,6 +86,29 @@ public void deleteFile(String location) { io(location).deleteFile(location); } + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + Map> pathByFileIO = + StreamSupport.stream(pathsToDelete.spliterator(), false) + .collect(Collectors.groupingBy(this::io)); + for (Map.Entry> entries : pathByFileIO.entrySet()) { + FileIO io = entries.getKey(); + if (io instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io).deleteFiles(entries.getValue()); + } else { + LOG.warn( + "IO {} does not support bulk operations. Using non-bulk deletes.", + io.getClass().getName()); + Tasks.Builder deleteTasks = + Tasks.foreach(entries.getValue()) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + deleteTasks.run(io::deleteFile); + } + } + } + @Override public Map properties() { return properties.immutableMap(); diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 8745c45c72ea..954ee32a12bb 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -20,13 +20,26 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.File; import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; public class TestResolvingIO { + @TempDir private File tempDir; + @Test public void testResolvingFileIOKryoSerialization() throws IOException { FileIO testResolvingFileIO = new ResolvingFileIO(); @@ -47,4 +60,35 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO); assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testResolvingFileIO.properties()); } + + @Test + public void resolveFileIOBulkDeletion() throws IOException { + // configure resolving fileIO + FileIO resolvingFileIO = new ResolvingFileIO(); + Configuration hadoopConf = new Configuration(); + ((HadoopConfigurable) resolvingFileIO).setConf(hadoopConf); + resolvingFileIO.initialize( + ImmutableMap.of("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")); + // configure delegation IO + HadoopFileIO delegate = new HadoopFileIO(hadoopConf); + FileSystem fs = FileSystem.getLocal(hadoopConf); + Path parent = new Path(tempDir.toURI()); + // write + List randomFilePaths = + IntStream.range(1, 10) + .mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID())) + .collect(Collectors.toList()); + for (Path randomFilePath : randomFilePaths) { + fs.createNewFile(randomFilePath); + assertThat(delegate.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); + } + // bulk deletion + List randomFilePathString = + randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList()); + ((SupportsBulkOperations) resolvingFileIO).deleteFiles(randomFilePathString); + + for (String path : randomFilePathString) { + assertThat(delegate.newInputFile(path).exists()).isFalse(); + } + } } From 76fe70a7bd64573c13e51327918eacff37b086ed Mon Sep 17 00:00:00 2001 From: Hongyue/Steve Zhang Date: Fri, 4 Aug 2023 11:07:06 -0700 Subject: [PATCH 2/8] Update core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java Co-authored-by: Eduard Tudenhoefner --- core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 954ee32a12bb..eabc1b18e28e 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -64,7 +64,7 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF @Test public void resolveFileIOBulkDeletion() throws IOException { // configure resolving fileIO - FileIO resolvingFileIO = new ResolvingFileIO(); + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); Configuration hadoopConf = new Configuration(); ((HadoopConfigurable) resolvingFileIO).setConf(hadoopConf); resolvingFileIO.initialize( From f47cfed1c0c6e4997c6a36765225dde494d719d7 Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Fri, 4 Aug 2023 11:11:21 -0700 Subject: [PATCH 3/8] Remove fileIO casting --- .../src/test/java/org/apache/iceberg/io/TestResolvingIO.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index eabc1b18e28e..130197c93433 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -66,7 +65,7 @@ public void resolveFileIOBulkDeletion() throws IOException { // configure resolving fileIO ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); Configuration hadoopConf = new Configuration(); - ((HadoopConfigurable) resolvingFileIO).setConf(hadoopConf); + resolvingFileIO.setConf(hadoopConf); resolvingFileIO.initialize( ImmutableMap.of("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")); // configure delegation IO @@ -85,7 +84,7 @@ public void resolveFileIOBulkDeletion() throws IOException { // bulk deletion List randomFilePathString = randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList()); - ((SupportsBulkOperations) resolvingFileIO).deleteFiles(randomFilePathString); + resolvingFileIO.deleteFiles(randomFilePathString); for (String path : randomFilePathString) { assertThat(delegate.newInputFile(path).exists()).isFalse(); From 92b7c97d0625b6f1f244c04bf472f2a09a85d642 Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Tue, 8 Aug 2023 15:48:22 -0700 Subject: [PATCH 4/8] Address russell feedback and add unit test for non-bulk deletion --- .../apache/iceberg/io/ResolvingFileIO.java | 21 ++++++++---- .../apache/iceberg/io/TestResolvingIO.java | 32 ++++++++++++++++++- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 61b68398a752..fe00757b34a2 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -30,6 +30,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -45,11 +46,17 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulk private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; + private static final String IN_MEMORY_IO_IMPL = "org.apache.iceberg.inmemory.InMemoryFileIO"; private static final Map SCHEME_TO_FILE_IO = ImmutableMap.of( - "s3", S3_FILE_IO_IMPL, - "s3a", S3_FILE_IO_IMPL, - "s3n", S3_FILE_IO_IMPL); + "s3", + S3_FILE_IO_IMPL, + "s3a", + S3_FILE_IO_IMPL, + "s3n", + S3_FILE_IO_IMPL, + "inmemory", + IN_MEMORY_IO_IMPL); private final Map ioInstances = Maps.newHashMap(); private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -93,14 +100,15 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu .collect(Collectors.groupingBy(this::io)); for (Map.Entry> entries : pathByFileIO.entrySet()) { FileIO io = entries.getKey(); + List filePaths = entries.getValue(); if (io instanceof SupportsBulkOperations) { - ((SupportsBulkOperations) io).deleteFiles(entries.getValue()); + ((SupportsBulkOperations) io).deleteFiles(filePaths); } else { LOG.warn( "IO {} does not support bulk operations. Using non-bulk deletes.", io.getClass().getName()); Tasks.Builder deleteTasks = - Tasks.foreach(entries.getValue()) + Tasks.foreach(filePaths) .noRetry() .suppressFailureWhenFinished() .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); @@ -153,7 +161,8 @@ public Configuration getConf() { return hadoopConf.get(); } - private FileIO io(String location) { + @VisibleForTesting + FileIO io(String location) { String impl = implFromLocation(location); FileIO io = ioInstances.get(impl); if (io != null) { diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 130197c93433..5aa57a2516d4 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.inmemory.InMemoryFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -69,9 +70,9 @@ public void resolveFileIOBulkDeletion() throws IOException { resolvingFileIO.initialize( ImmutableMap.of("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")); // configure delegation IO - HadoopFileIO delegate = new HadoopFileIO(hadoopConf); FileSystem fs = FileSystem.getLocal(hadoopConf); Path parent = new Path(tempDir.toURI()); + HadoopFileIO delegate = (HadoopFileIO) resolvingFileIO.io(parent.toString()); // write List randomFilePaths = IntStream.range(1, 10) @@ -90,4 +91,33 @@ public void resolveFileIOBulkDeletion() throws IOException { assertThat(delegate.newInputFile(path).exists()).isFalse(); } } + + @Test + public void resolveFileIONonBulkDeletion() { + // configure resolving fileIO + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + Configuration hadoopConf = new Configuration(); + resolvingFileIO.setConf(hadoopConf); + resolvingFileIO.initialize( + ImmutableMap.of("io-impl", "org.apache.iceberg.inmemory.InMemoryFileIO")); + String parentPath = "inmemory://foo.db/bar"; + InMemoryFileIO delegation = (InMemoryFileIO) resolvingFileIO.io(parentPath); + // write + byte[] someData = "some data".getBytes(); + + List randomFilePaths = + IntStream.range(1, 10) + .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID()) + .collect(Collectors.toList()); + for (String randomFilePath : randomFilePaths) { + delegation.addFile(randomFilePath, someData); + assertThat(delegation.fileExists(randomFilePath)).isTrue(); + } + // non-bulk deletion + resolvingFileIO.deleteFiles(randomFilePaths); + + for (String path : randomFilePaths) { + assertThat(delegation.fileExists(path)).isFalse(); + } + } } From 45dee7123521d67ee679956c61a4391029852b40 Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Wed, 9 Aug 2023 14:18:59 -0700 Subject: [PATCH 5/8] Partition path to delete by 100k to avoid driver OOM --- .../apache/iceberg/io/ResolvingFileIO.java | 62 +++++++++---------- .../apache/iceberg/io/TestResolvingIO.java | 16 +++-- 2 files changed, 42 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 6aded585136f..1af95d7e4332 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.exceptions.ValidationException; @@ -33,6 +32,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializableMap; @@ -44,22 +44,16 @@ /** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); + private static final Integer BATCH_SIZE = 100_000; private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; private static final String GCS_FILE_IO_IMPL = "org.apache.iceberg.gcp.gcs.GCSFileIO"; - private static final String IN_MEMORY_IO_IMPL = "org.apache.iceberg.inmemory.InMemoryFileIO"; private static final Map SCHEME_TO_FILE_IO = ImmutableMap.of( - "s3", - S3_FILE_IO_IMPL, - "s3a", - S3_FILE_IO_IMPL, - "s3n", - S3_FILE_IO_IMPL, - "gs", - GCS_FILE_IO_IMPL, - "inmemory", - IN_MEMORY_IO_IMPL); + "s3", S3_FILE_IO_IMPL, + "s3a", S3_FILE_IO_IMPL, + "s3n", S3_FILE_IO_IMPL, + "gs", GCS_FILE_IO_IMPL); private final Map ioInstances = Maps.newHashMap(); private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -98,26 +92,30 @@ public void deleteFile(String location) { @Override public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { - Map> pathByFileIO = - StreamSupport.stream(pathsToDelete.spliterator(), false) - .collect(Collectors.groupingBy(this::io)); - for (Map.Entry> entries : pathByFileIO.entrySet()) { - FileIO io = entries.getKey(); - List filePaths = entries.getValue(); - if (io instanceof SupportsBulkOperations) { - ((SupportsBulkOperations) io).deleteFiles(filePaths); - } else { - LOG.warn( - "IO {} does not support bulk operations. Using non-bulk deletes.", - io.getClass().getName()); - Tasks.Builder deleteTasks = - Tasks.foreach(filePaths) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); - deleteTasks.run(io::deleteFile); - } - } + Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE) + .forEachRemaining( + partitioned -> { + Map> pathByFileIO = + partitioned.stream().collect(Collectors.groupingBy(this::io)); + for (Map.Entry> entries : pathByFileIO.entrySet()) { + FileIO io = entries.getKey(); + List filePaths = entries.getValue(); + if (io instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io).deleteFiles(filePaths); + } else { + LOG.warn( + "IO {} does not support bulk operations. Using non-bulk deletes.", + io.getClass().getName()); + Tasks.Builder deleteTasks = + Tasks.foreach(filePaths) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + deleteTasks.run(io::deleteFile); + } + } + }); } @Override diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 5aa57a2516d4..3264aa44e595 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -19,6 +19,9 @@ package org.apache.iceberg.io; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -69,10 +72,12 @@ public void resolveFileIOBulkDeletion() throws IOException { resolvingFileIO.setConf(hadoopConf); resolvingFileIO.initialize( ImmutableMap.of("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")); + ResolvingFileIO spy = spy(resolvingFileIO); // configure delegation IO FileSystem fs = FileSystem.getLocal(hadoopConf); Path parent = new Path(tempDir.toURI()); - HadoopFileIO delegate = (HadoopFileIO) resolvingFileIO.io(parent.toString()); + HadoopFileIO delegate = new HadoopFileIO(hadoopConf); + when(spy.io(anyString())).thenReturn(delegate); // write List randomFilePaths = IntStream.range(1, 10) @@ -85,7 +90,7 @@ public void resolveFileIOBulkDeletion() throws IOException { // bulk deletion List randomFilePathString = randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList()); - resolvingFileIO.deleteFiles(randomFilePathString); + spy.deleteFiles(randomFilePathString); for (String path : randomFilePathString) { assertThat(delegate.newInputFile(path).exists()).isFalse(); @@ -100,8 +105,11 @@ public void resolveFileIONonBulkDeletion() { resolvingFileIO.setConf(hadoopConf); resolvingFileIO.initialize( ImmutableMap.of("io-impl", "org.apache.iceberg.inmemory.InMemoryFileIO")); + ResolvingFileIO spy = spy(resolvingFileIO); + // configure delegation IO String parentPath = "inmemory://foo.db/bar"; - InMemoryFileIO delegation = (InMemoryFileIO) resolvingFileIO.io(parentPath); + InMemoryFileIO delegation = new InMemoryFileIO(); + when(spy.io(anyString())).thenReturn(delegation); // write byte[] someData = "some data".getBytes(); @@ -114,7 +122,7 @@ public void resolveFileIONonBulkDeletion() { assertThat(delegation.fileExists(randomFilePath)).isTrue(); } // non-bulk deletion - resolvingFileIO.deleteFiles(randomFilePaths); + spy.deleteFiles(randomFilePaths); for (String path : randomFilePaths) { assertThat(delegation.fileExists(path)).isFalse(); From 8af94059410a91d8c7e2d269d721b7bff80477e9 Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Thu, 10 Aug 2023 13:11:50 -0700 Subject: [PATCH 6/8] Address feedback and rebase --- .../apache/iceberg/io/TestResolvingIO.java | 59 +++++++------------ 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 66d37d509a0a..e95d1fb7904e 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -20,8 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -32,20 +32,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -//import java.nio.file.Path; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.inmemory.InMemoryFileIO; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; public class TestResolvingIO { - @TempDir private Path temp; - @TempDir private File tempDir; @Test @@ -66,16 +61,17 @@ public void testResolvingFileIOWithHadoopFileIOKryoSerialization() throws IOExce resolvingFileIO.setConf(conf); resolvingFileIO.initialize(ImmutableMap.of("k1", "v1")); - assertThat(resolvingFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); - assertThat(resolvingFileIO.newInputFile(temp.toString())).isNotNull(); + assertThat(resolvingFileIO.ioClass(tempDir.getCanonicalPath())).isEqualTo(HadoopFileIO.class); + assertThat(resolvingFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); ResolvingFileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(resolvingFileIO); roundTripSerializedFileIO.setConf(conf); assertThat(roundTripSerializedFileIO.properties()).isEqualTo(resolvingFileIO.properties()); - assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); - assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull(); + assertThat(roundTripSerializedFileIO.ioClass(tempDir.getCanonicalPath())) + .isEqualTo(HadoopFileIO.class); + assertThat(roundTripSerializedFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); } @Test @@ -96,31 +92,27 @@ public void testResolvingFileIOWithHadoopFileIOJavaSerialization() resolvingFileIO.setConf(conf); resolvingFileIO.initialize(ImmutableMap.of("k1", "v1")); - assertThat(resolvingFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); - assertThat(resolvingFileIO.newInputFile(temp.toString())).isNotNull(); + assertThat(resolvingFileIO.ioClass(tempDir.getCanonicalPath())).isEqualTo(HadoopFileIO.class); + assertThat(resolvingFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); ResolvingFileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(resolvingFileIO); roundTripSerializedFileIO.setConf(conf); assertThat(roundTripSerializedFileIO.properties()).isEqualTo(resolvingFileIO.properties()); - assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); - assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull(); + assertThat(roundTripSerializedFileIO.ioClass(tempDir.getCanonicalPath())) + .isEqualTo(HadoopFileIO.class); + assertThat(roundTripSerializedFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); } @Test public void resolveFileIOBulkDeletion() throws IOException { - // configure resolving fileIO - ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); Configuration hadoopConf = new Configuration(); - resolvingFileIO.setConf(hadoopConf); - resolvingFileIO.initialize( - ImmutableMap.of("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")); - ResolvingFileIO spy = spy(resolvingFileIO); - // configure delegation IO FileSystem fs = FileSystem.getLocal(hadoopConf); Path parent = new Path(tempDir.toURI()); - HadoopFileIO delegate = new HadoopFileIO(hadoopConf); - when(spy.io(anyString())).thenReturn(delegate); + // configure delegation IO + HadoopFileIO delegation = new HadoopFileIO(hadoopConf); + doReturn(delegation).when(resolvingFileIO).io(anyString()); // write List randomFilePaths = IntStream.range(1, 10) @@ -128,34 +120,27 @@ public void resolveFileIOBulkDeletion() throws IOException { .collect(Collectors.toList()); for (Path randomFilePath : randomFilePaths) { fs.createNewFile(randomFilePath); - assertThat(delegate.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); + assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); } // bulk deletion List randomFilePathString = randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList()); - spy.deleteFiles(randomFilePathString); + resolvingFileIO.deleteFiles(randomFilePathString); for (String path : randomFilePathString) { - assertThat(delegate.newInputFile(path).exists()).isFalse(); + assertThat(delegation.newInputFile(path).exists()).isFalse(); } } @Test public void resolveFileIONonBulkDeletion() { - // configure resolving fileIO - ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); - Configuration hadoopConf = new Configuration(); - resolvingFileIO.setConf(hadoopConf); - resolvingFileIO.initialize( - ImmutableMap.of("io-impl", "org.apache.iceberg.inmemory.InMemoryFileIO")); - ResolvingFileIO spy = spy(resolvingFileIO); - // configure delegation IO + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); String parentPath = "inmemory://foo.db/bar"; + // configure delegation IO InMemoryFileIO delegation = new InMemoryFileIO(); - when(spy.io(anyString())).thenReturn(delegation); + doReturn(delegation).when(resolvingFileIO).io(anyString()); // write byte[] someData = "some data".getBytes(); - List randomFilePaths = IntStream.range(1, 10) .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID()) @@ -165,7 +150,7 @@ public void resolveFileIONonBulkDeletion() { assertThat(delegation.fileExists(randomFilePath)).isTrue(); } // non-bulk deletion - spy.deleteFiles(randomFilePaths); + resolvingFileIO.deleteFiles(randomFilePaths); for (String path : randomFilePaths) { assertThat(delegation.fileExists(path)).isFalse(); From 76b4ef977ee325b9c908ecbf2a4d3abf4ec48318 Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Thu, 10 Aug 2023 16:53:40 -0700 Subject: [PATCH 7/8] Use primitive int for batch size --- core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index aa17c6fdbc5b..63e223792276 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -44,7 +44,7 @@ /** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); - private static final Integer BATCH_SIZE = 100_000; + private static final int BATCH_SIZE = 100_000; private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; private static final String GCS_FILE_IO_IMPL = "org.apache.iceberg.gcp.gcs.GCSFileIO"; From 1443616f0d1989b09709732f7e78d8e254d4d31f Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Fri, 11 Aug 2023 10:50:45 -0700 Subject: [PATCH 8/8] Simplify tempDir usage in TestResolvingIO --- .../apache/iceberg/io/TestResolvingIO.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index e95d1fb7904e..ff1499bff777 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import java.io.File; import java.io.IOException; import java.util.List; import java.util.UUID; @@ -41,7 +40,7 @@ public class TestResolvingIO { - @TempDir private File tempDir; + @TempDir private java.nio.file.Path temp; @Test public void testResolvingFileIOKryoSerialization() throws IOException { @@ -61,17 +60,16 @@ public void testResolvingFileIOWithHadoopFileIOKryoSerialization() throws IOExce resolvingFileIO.setConf(conf); resolvingFileIO.initialize(ImmutableMap.of("k1", "v1")); - assertThat(resolvingFileIO.ioClass(tempDir.getCanonicalPath())).isEqualTo(HadoopFileIO.class); - assertThat(resolvingFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); + assertThat(resolvingFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); + assertThat(resolvingFileIO.newInputFile(temp.toString())).isNotNull(); ResolvingFileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(resolvingFileIO); roundTripSerializedFileIO.setConf(conf); assertThat(roundTripSerializedFileIO.properties()).isEqualTo(resolvingFileIO.properties()); - assertThat(roundTripSerializedFileIO.ioClass(tempDir.getCanonicalPath())) - .isEqualTo(HadoopFileIO.class); - assertThat(roundTripSerializedFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); + assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); + assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull(); } @Test @@ -92,16 +90,15 @@ public void testResolvingFileIOWithHadoopFileIOJavaSerialization() resolvingFileIO.setConf(conf); resolvingFileIO.initialize(ImmutableMap.of("k1", "v1")); - assertThat(resolvingFileIO.ioClass(tempDir.getCanonicalPath())).isEqualTo(HadoopFileIO.class); - assertThat(resolvingFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); + assertThat(resolvingFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); + assertThat(resolvingFileIO.newInputFile(temp.toString())).isNotNull(); ResolvingFileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(resolvingFileIO); roundTripSerializedFileIO.setConf(conf); assertThat(roundTripSerializedFileIO.properties()).isEqualTo(resolvingFileIO.properties()); - assertThat(roundTripSerializedFileIO.ioClass(tempDir.getCanonicalPath())) - .isEqualTo(HadoopFileIO.class); - assertThat(roundTripSerializedFileIO.newInputFile(tempDir.getCanonicalPath())).isNotNull(); + assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class); + assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull(); } @Test @@ -109,7 +106,7 @@ public void resolveFileIOBulkDeletion() throws IOException { ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); Configuration hadoopConf = new Configuration(); FileSystem fs = FileSystem.getLocal(hadoopConf); - Path parent = new Path(tempDir.toURI()); + Path parent = new Path(temp.toUri()); // configure delegation IO HadoopFileIO delegation = new HadoopFileIO(hadoopConf); doReturn(delegation).when(resolvingFileIO).io(anyString());