diff --git a/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java b/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java new file mode 100644 index 000000000000..4b97b3cb2874 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +/** + * This interface is intended as an extension for FileIO implementations that support being a + * delegate target. + */ +public interface DelegateFileIO extends FileIO, SupportsPrefixOperations, SupportsBulkOperations {} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index fa7590608880..dd13e13f01a6 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -33,12 +33,10 @@ import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.CredentialSupplier; -import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.SupportsBulkOperations; -import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -75,8 +73,7 @@ * schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes * will result in {@link org.apache.iceberg.exceptions.ValidationException}. */ -public class S3FileIO - implements FileIO, SupportsBulkOperations, SupportsPrefixOperations, CredentialSupplier { +public class S3FileIO implements CredentialSupplier, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index 761cd514316c..a74e574c9751 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -35,12 +35,14 @@ import java.util.Random; import java.util.UUID; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.Schema; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; @@ -48,6 +50,7 @@ import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -361,6 +364,19 @@ public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundExc .isEqualTo(testS3FileIO.properties()); } + @Test + public void testResolvingFileIOLoad() { + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("s3://foo/bar"); + Assertions.assertThat(result).isInstanceOf(S3FileIO.class); + } + private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 04ead0bd6791..7aaa2b6a75b1 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -32,12 +32,10 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.BulkDeletionFailureException; -import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.SupportsBulkOperations; -import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; @@ -47,8 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HadoopFileIO - implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations { +public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 58c0586b7e95..076037b577f7 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -31,18 +31,22 @@ import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; -import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ -public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations { +/** + * FileIO implementation that uses location scheme to choose the correct FileIO implementation. + * Delegate FileIO implementations must implement the {@link DelegateFileIO} mixin interface, + * otherwise initialization will fail. + */ +public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); private static final int BATCH_SIZE = 100_000; private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; @@ -55,7 +59,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulk "s3n", S3_FILE_IO_IMPL, "gs", GCS_FILE_IO_IMPL); - private final Map ioInstances = Maps.newConcurrentMap(); + private final Map ioInstances = Maps.newConcurrentMap(); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final transient StackTraceElement[] createStack; private SerializableMap properties; @@ -95,25 +99,12 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE) .forEachRemaining( partitioned -> { - Map> pathByFileIO = + Map> pathByFileIO = partitioned.stream().collect(Collectors.groupingBy(this::io)); - for (Map.Entry> entries : pathByFileIO.entrySet()) { - FileIO io = entries.getKey(); + for (Map.Entry> entries : pathByFileIO.entrySet()) { + DelegateFileIO io = entries.getKey(); List filePaths = entries.getValue(); - if (io instanceof SupportsBulkOperations) { - ((SupportsBulkOperations) io).deleteFiles(filePaths); - } else { - LOG.warn( - "IO {} does not support bulk operations. Using non-bulk deletes.", - io.getClass().getName()); - Tasks.Builder deleteTasks = - Tasks.foreach(filePaths) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); - deleteTasks.run(io::deleteFile); - } + io.deleteFiles(filePaths); } }); } @@ -133,12 +124,12 @@ public void initialize(Map newProperties) { @Override public void close() { if (isClosed.compareAndSet(false, true)) { - List instances = Lists.newArrayList(); + List instances = Lists.newArrayList(); instances.addAll(ioInstances.values()); ioInstances.clear(); - for (FileIO io : instances) { + for (DelegateFileIO io : instances) { io.close(); } } @@ -161,9 +152,9 @@ public Configuration getConf() { } @VisibleForTesting - FileIO io(String location) { + DelegateFileIO io(String location) { String impl = implFromLocation(location); - FileIO io = ioInstances.get(impl); + DelegateFileIO io = ioInstances.get(impl); if (io != null) { if (io instanceof HadoopConfigurable && ((HadoopConfigurable) io).getConf() == null) { synchronized (io) { @@ -181,13 +172,14 @@ FileIO io(String location) { impl, key -> { Configuration conf = hadoopConf.get(); + FileIO fileIO; try { Map props = Maps.newHashMap(properties); // ResolvingFileIO is keeping track of the creation stacktrace, so no need to do the // same in S3FileIO. props.put("init-creation-stacktrace", "false"); - return CatalogUtil.loadFileIO(key, props, conf); + fileIO = CatalogUtil.loadFileIO(key, props, conf); } catch (IllegalArgumentException e) { if (key.equals(FALLBACK_IMPL)) { // no implementation to fall back to, throw the exception @@ -200,7 +192,7 @@ FileIO io(String location) { FALLBACK_IMPL, e); try { - return CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf); + fileIO = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf); } catch (IllegalArgumentException suppressed) { LOG.warn( "Failed to load FileIO implementation: {} (fallback)", @@ -213,10 +205,17 @@ FileIO io(String location) { } } } + + Preconditions.checkState( + fileIO instanceof DelegateFileIO, + "FileIO does not implement DelegateFileIO: " + fileIO.getClass().getName()); + + return (DelegateFileIO) fileIO; }); } - private static String implFromLocation(String location) { + @VisibleForTesting + String implFromLocation(String location) { return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL); } @@ -252,4 +251,14 @@ protected void finalize() throws Throwable { } } } + + @Override + public Iterable listPrefix(String prefix) { + return io(prefix).listPrefix(prefix); + } + + @Override + public void deletePrefix(String prefix) { + io(prefix).deletePrefix(prefix); + } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java index 636c94069d5c..0a195819e17c 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java @@ -30,8 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; @@ -168,6 +170,19 @@ public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoun .isEqualTo(testHadoopFileIO.properties()); } + @Test + public void testResolvingFileIOLoad() { + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("hdfs://foo/bar"); + Assertions.assertThat(result).isInstanceOf(HadoopFileIO.class); + } + private List createRandomFiles(Path parent, int count) { Vector paths = new Vector<>(); random diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index ff1499bff777..f072053eea9e 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -19,9 +19,13 @@ package org.apache.iceberg.io; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.withSettings; import java.io.IOException; import java.util.List; @@ -33,7 +37,6 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.hadoop.HadoopFileIO; -import org.apache.iceberg.inmemory.InMemoryFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -130,27 +133,53 @@ public void resolveFileIOBulkDeletion() throws IOException { } @Test - public void resolveFileIONonBulkDeletion() { + public void delegateFileIOWithPrefixBasedSupport() throws IOException { ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); - String parentPath = "inmemory://foo.db/bar"; - // configure delegation IO - InMemoryFileIO delegation = new InMemoryFileIO(); - doReturn(delegation).when(resolvingFileIO).io(anyString()); - // write - byte[] someData = "some data".getBytes(); - List randomFilePaths = + Configuration hadoopConf = new Configuration(); + FileSystem fs = FileSystem.getLocal(hadoopConf); + Path parent = new Path(temp.toUri()); + HadoopFileIO delegate = new HadoopFileIO(hadoopConf); + doReturn(delegate).when(resolvingFileIO).io(anyString()); + + List paths = IntStream.range(1, 10) - .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID()) + .mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID())) .collect(Collectors.toList()); - for (String randomFilePath : randomFilePaths) { - delegation.addFile(randomFilePath, someData); - assertThat(delegation.fileExists(randomFilePath)).isTrue(); + for (Path path : paths) { + fs.createNewFile(path); + assertThat(delegate.newInputFile(path.toString()).exists()).isTrue(); } - // non-bulk deletion - resolvingFileIO.deleteFiles(randomFilePaths); - for (String path : randomFilePaths) { - assertThat(delegation.fileExists(path)).isFalse(); - } + paths.stream() + .map(Path::toString) + .forEach( + path -> { + // HadoopFileIO can only list prefixes that match the full path + assertThat(resolvingFileIO.listPrefix(path)).hasSize(1); + resolvingFileIO.deletePrefix(path); + assertThat(delegate.newInputFile(path).exists()).isFalse(); + }); + } + + @Test + public void delegateFileIOWithAndWithoutMixins() { + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + + String fileIONoMixins = mock(FileIO.class).getClass().getName(); + doReturn(fileIONoMixins).when(resolvingFileIO).implFromLocation(any()); + assertThatThrownBy(() -> resolvingFileIO.newInputFile("/file")) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith( + "FileIO does not implement DelegateFileIO: org.apache.iceberg.io.FileIO"); + + String fileIOWithMixins = + mock(FileIO.class, withSettings().extraInterfaces(DelegateFileIO.class)) + .getClass() + .getName(); + doReturn(fileIOWithMixins).when(resolvingFileIO).implFromLocation(any()); + // being null is ok here as long as the code doesn't throw an exception + assertThat(resolvingFileIO.newInputFile("/file")).isNull(); } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 1e164f059dad..09eb4a74008d 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -30,12 +30,10 @@ import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.gcp.GCPProperties; import org.apache.iceberg.io.BulkDeletionFailureException; -import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.SupportsBulkOperations; -import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Streams; @@ -56,7 +54,7 @@ *

See Cloud Storage * Overview */ -public class GCSFileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations { +public class GCSFileIO implements DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index 013c4d4955a2..50aa735605b4 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -35,12 +35,15 @@ import java.util.List; import java.util.Random; import java.util.stream.StreamSupport; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.gcp.GCPProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -206,4 +209,17 @@ public void testGCSFileIOJavaSerialization() throws IOException, ClassNotFoundEx assertThat(testGCSFileIO.properties()).isEqualTo(roundTripSerializedFileIO.properties()); } + + @Test + public void testResolvingFileIOLoad() { + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("gs://foo/bar"); + assertThat(result).isInstanceOf(GCSFileIO.class); + } }