diff --git a/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java b/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java new file mode 100644 index 000000000000..4b97b3cb2874 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +/** + * This interface is intended as an extension for FileIO implementations that support being a + * delegate target. + */ +public interface DelegateFileIO extends FileIO, SupportsPrefixOperations, SupportsBulkOperations {} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 93793157e2e1..bac5b41bf8dd 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -33,6 +33,7 @@ import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.CredentialSupplier; +import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; @@ -76,7 +77,11 @@ * will result in {@link org.apache.iceberg.exceptions.ValidationException}. */ public class S3FileIO - implements FileIO, SupportsBulkOperations, SupportsPrefixOperations, CredentialSupplier { + implements FileIO, + SupportsBulkOperations, + SupportsPrefixOperations, + CredentialSupplier, + DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index 761cd514316c..a74e574c9751 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -35,12 +35,14 @@ import java.util.Random; import java.util.UUID; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.Schema; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; @@ -48,6 +50,7 @@ import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -361,6 +364,19 @@ public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundExc .isEqualTo(testS3FileIO.properties()); } + @Test + public void testResolvingFileIOLoad() { + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("s3://foo/bar"); + Assertions.assertThat(result).isInstanceOf(S3FileIO.class); + } + private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 04ead0bd6791..ad23ae7912ab 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; @@ -48,7 +49,11 @@ import org.slf4j.LoggerFactory; public class HadoopFileIO - implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations { + implements FileIO, + HadoopConfigurable, + SupportsPrefixOperations, + SupportsBulkOperations, + DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 28d07bcf249e..622396521ef3 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -19,6 +19,7 @@ package org.apache.iceberg.io; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,17 +29,26 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.PeekingIterator; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ -public class ResolvingFileIO implements FileIO, HadoopConfigurable { +/** + * FileIO implementation that uses location scheme to choose the correct FileIO implementation. + * Delegate FileIO implementations must implement the {@link DelegateFileIO} mixin interface, + * otherwise initialization will fail. + */ +public class ResolvingFileIO + implements FileIO, SupportsPrefixOperations, SupportsBulkOperations, HadoopConfigurable { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; @@ -50,7 +60,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable { "s3n", S3_FILE_IO_IMPL, "gs", GCS_FILE_IO_IMPL); - private final Map ioInstances = Maps.newHashMap(); + private final Map ioInstances = Maps.newHashMap(); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final transient StackTraceElement[] createStack; private SerializableMap properties; @@ -85,6 +95,29 @@ public void deleteFile(String location) { io(location).deleteFile(location); } + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + // peek at the first element to determine the type of FileIO + Iterator originalIterator = pathsToDelete.iterator(); + if (!originalIterator.hasNext()) { + return; + } + + PeekingIterator iterator = Iterators.peekingIterator(originalIterator); + DelegateFileIO fileIO = io(iterator.peek()); + fileIO.deleteFiles(() -> iterator); + } + + @Override + public Iterable listPrefix(String prefix) { + return io(prefix).listPrefix(prefix); + } + + @Override + public void deletePrefix(String prefix) { + io(prefix).deletePrefix(prefix); + } + @Override public Map properties() { return properties.immutableMap(); @@ -100,14 +133,14 @@ public void initialize(Map newProperties) { @Override public void close() { if (isClosed.compareAndSet(false, true)) { - List instances = Lists.newArrayList(); + List instances = Lists.newArrayList(); synchronized (ioInstances) { instances.addAll(ioInstances.values()); ioInstances.clear(); } - for (FileIO io : instances) { + for (DelegateFileIO io : instances) { io.close(); } } @@ -129,14 +162,15 @@ public Configuration getConf() { return hadoopConf.get(); } - private FileIO io(String location) { + private DelegateFileIO io(String location) { String impl = implFromLocation(location); - FileIO io = ioInstances.get(impl); + DelegateFileIO io = ioInstances.get(impl); if (io != null) { return io; } synchronized (ioInstances) { + // double check while holding the lock io = ioInstances.get(impl); if (io != null) { @@ -145,12 +179,13 @@ private FileIO io(String location) { Configuration conf = hadoopConf.get(); + FileIO newFileIO; try { Map props = Maps.newHashMap(properties); // ResolvingFileIO is keeping track of the creation stacktrace, so no need to do the same in // S3FileIO. props.put("init-creation-stacktrace", "false"); - io = CatalogUtil.loadFileIO(impl, props, conf); + newFileIO = CatalogUtil.loadFileIO(impl, props, conf); } catch (IllegalArgumentException e) { if (impl.equals(FALLBACK_IMPL)) { // no implementation to fall back to, throw the exception @@ -163,7 +198,7 @@ private FileIO io(String location) { FALLBACK_IMPL, e); try { - io = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf); + newFileIO = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf); } catch (IllegalArgumentException suppressed) { LOG.warn( "Failed to load FileIO implementation: {} (fallback)", FALLBACK_IMPL, suppressed); @@ -175,13 +210,19 @@ private FileIO io(String location) { } } + Preconditions.checkState( + newFileIO instanceof DelegateFileIO, + "FileIO does not implement DelegateFileIO: " + newFileIO.getClass().getName()); + + io = (DelegateFileIO) newFileIO; ioInstances.put(impl, io); } return io; } - private static String implFromLocation(String location) { + @VisibleForTesting + String implFromLocation(String location) { return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL); } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java index 636c94069d5c..0a195819e17c 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java @@ -30,8 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; @@ -168,6 +170,19 @@ public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoun .isEqualTo(testHadoopFileIO.properties()); } + @Test + public void testResolvingFileIOLoad() { + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("hdfs://foo/bar"); + Assertions.assertThat(result).isInstanceOf(HadoopFileIO.class); + } + private List createRandomFiles(Path parent, int count) { Vector paths = new Vector<>(); random diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index 8745c45c72ea..ef4ec4a7d071 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -19,8 +19,16 @@ package org.apache.iceberg.io; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.withSettings; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -47,4 +55,23 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO); assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testResolvingFileIO.properties()); } + + @Test + public void testEnsureInterfaceImplementation() { + ResolvingFileIO testResolvingFileIO = spy(new ResolvingFileIO()); + testResolvingFileIO.setConf(new Configuration()); + testResolvingFileIO.initialize(ImmutableMap.of()); + + String fileIONoMixins = mock(FileIO.class).getClass().getName(); + doReturn(fileIONoMixins).when(testResolvingFileIO).implFromLocation(any()); + assertThatThrownBy(() -> testResolvingFileIO.newInputFile("/file")) + .isInstanceOf(IllegalStateException.class); + + String fileIOWithMixins = + mock(FileIO.class, withSettings().extraInterfaces(DelegateFileIO.class)) + .getClass() + .getName(); + doReturn(fileIOWithMixins).when(testResolvingFileIO).implFromLocation(any()); + assertThatCode(() -> testResolvingFileIO.newInputFile("/file")).doesNotThrowAnyException(); + } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 54af44e43da8..ecb36ecc2984 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -48,7 +48,7 @@ *

See Cloud Storage * Overview */ -public class GCSFileIO implements FileIO { +public class GCSFileIO implements FileIO { // FIXME: add DelegateFileIO private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index 4e91b3e0e753..d455161a9c77 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -30,14 +30,18 @@ import java.nio.ByteBuffer; import java.util.Random; import java.util.stream.StreamSupport; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.gcp.GCPProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class GCSFileIOTest { @@ -121,4 +125,18 @@ public void testGCSFileIOJavaSerialization() throws IOException, ClassNotFoundEx assertThat(testGCSFileIO.properties()).isEqualTo(roundTripSerializedFileIO.properties()); } + + @Disabled // FIXME + @Test + public void testResolvingFileIOLoad() { + ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("gs://foo/bar"); + assertThat(result).isInstanceOf(GCSFileIO.class); + } }