Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

/**
* This interface is intended as an extension for FileIO implementations that support being a
* delegate target.
*/
public interface DelegateFileIO extends FileIO, SupportsPrefixOperations, SupportsBulkOperations {}
7 changes: 6 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CredentialSupplier;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -76,7 +77,11 @@
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO
implements FileIO, SupportsBulkOperations, SupportsPrefixOperations, CredentialSupplier {
implements FileIO,
SupportsBulkOperations,
SupportsPrefixOperations,
CredentialSupplier,
DelegateFileIO {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down
16 changes: 16 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -361,6 +364,19 @@ public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundExc
.isEqualTo(testS3FileIO.properties());
}

@Test
public void testResolvingFileIOLoad() {
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
resolvingFileIO.setConf(new Configuration());
resolvingFileIO.initialize(ImmutableMap.of());
FileIO result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(resolvingFileIO)
.invoke("s3://foo/bar");
Assertions.assertThat(result).isInstanceOf(S3FileIO.class);
}

private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
Expand All @@ -48,7 +49,11 @@
import org.slf4j.LoggerFactory;

public class HadoopFileIO
implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations {
implements FileIO,
HadoopConfigurable,
SupportsPrefixOperations,
SupportsBulkOperations,
DelegateFileIO {

private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class);
private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism";
Expand Down
61 changes: 51 additions & 10 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.io;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -28,17 +29,26 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.PeekingIterator;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */
public class ResolvingFileIO implements FileIO, HadoopConfigurable {
/**
* FileIO implementation that uses location scheme to choose the correct FileIO implementation.
* Delegate FileIO implementations must implement the {@link DelegateFileIO} mixin interface,
* otherwise initialization will fail.
*/
public class ResolvingFileIO
implements FileIO, SupportsPrefixOperations, SupportsBulkOperations, HadoopConfigurable {
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
Expand All @@ -50,7 +60,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable {
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL);

private final Map<String, FileIO> ioInstances = Maps.newHashMap();
private final Map<String, DelegateFileIO> ioInstances = Maps.newHashMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final transient StackTraceElement[] createStack;
private SerializableMap<String, String> properties;
Expand Down Expand Up @@ -85,6 +95,29 @@ public void deleteFile(String location) {
io(location).deleteFile(location);
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
// peek at the first element to determine the type of FileIO
Iterator<String> originalIterator = pathsToDelete.iterator();
if (!originalIterator.hasNext()) {
return;
}

PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
DelegateFileIO fileIO = io(iterator.peek());
fileIO.deleteFiles(() -> iterator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we resolve this? I think we probably want to pass the iterable here rather than the iterator to make sure we close everything properly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No not yet, I'm still thinking through the best solution. Passing the original iterable could cause an additional request to be made, i.e. one additional request for the peek, so I was hoping to avoid that.

}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
return io(prefix).listPrefix(prefix);
}

@Override
public void deletePrefix(String prefix) {
io(prefix).deletePrefix(prefix);
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
Expand All @@ -100,14 +133,14 @@ public void initialize(Map<String, String> newProperties) {
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
List<FileIO> instances = Lists.newArrayList();
List<DelegateFileIO> instances = Lists.newArrayList();

synchronized (ioInstances) {
instances.addAll(ioInstances.values());
ioInstances.clear();
}

for (FileIO io : instances) {
for (DelegateFileIO io : instances) {
io.close();
}
}
Expand All @@ -129,14 +162,15 @@ public Configuration getConf() {
return hadoopConf.get();
}

private FileIO io(String location) {
private DelegateFileIO io(String location) {
String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
DelegateFileIO io = ioInstances.get(impl);
if (io != null) {
return io;
}

synchronized (ioInstances) {

// double check while holding the lock
io = ioInstances.get(impl);
if (io != null) {
Expand All @@ -145,12 +179,13 @@ private FileIO io(String location) {

Configuration conf = hadoopConf.get();

FileIO newFileIO;
try {
Map<String, String> props = Maps.newHashMap(properties);
// ResolvingFileIO is keeping track of the creation stacktrace, so no need to do the same in
// S3FileIO.
props.put("init-creation-stacktrace", "false");
io = CatalogUtil.loadFileIO(impl, props, conf);
newFileIO = CatalogUtil.loadFileIO(impl, props, conf);
} catch (IllegalArgumentException e) {
if (impl.equals(FALLBACK_IMPL)) {
// no implementation to fall back to, throw the exception
Expand All @@ -163,7 +198,7 @@ private FileIO io(String location) {
FALLBACK_IMPL,
e);
try {
io = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
newFileIO = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
} catch (IllegalArgumentException suppressed) {
LOG.warn(
"Failed to load FileIO implementation: {} (fallback)", FALLBACK_IMPL, suppressed);
Expand All @@ -175,13 +210,19 @@ private FileIO io(String location) {
}
}

Preconditions.checkState(
newFileIO instanceof DelegateFileIO,
"FileIO does not implement DelegateFileIO: " + newFileIO.getClass().getName());

io = (DelegateFileIO) newFileIO;
ioInstances.put(impl, io);
}

return io;
}

private static String implFromLocation(String location) {
@VisibleForTesting
String implFromLocation(String location) {
return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
}

Expand Down
15 changes: 15 additions & 0 deletions core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
Expand Down Expand Up @@ -168,6 +170,19 @@ public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoun
.isEqualTo(testHadoopFileIO.properties());
}

@Test
public void testResolvingFileIOLoad() {
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
resolvingFileIO.setConf(new Configuration());
resolvingFileIO.initialize(ImmutableMap.of());
FileIO result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(resolvingFileIO)
.invoke("hdfs://foo/bar");
Assertions.assertThat(result).isInstanceOf(HadoopFileIO.class);
}

private List<Path> createRandomFiles(Path parent, int count) {
Vector<Path> paths = new Vector<>();
random
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.withSettings;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
Expand All @@ -47,4 +55,23 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF
FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO);
assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testResolvingFileIO.properties());
}

@Test
public void testEnsureInterfaceImplementation() {
ResolvingFileIO testResolvingFileIO = spy(new ResolvingFileIO());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok, but what I was hoping we would have coverage for was that all delegates properly worked. This may be very difficult, I'm just looking for a way to confirm that the class will work at runtime. If we can force all the checks at compile time I'd be ok with that as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wouldn't be able to test the actual delegates without having the dependencies here (i.e. iceberg-aws and iceberg-gcp). I'll see if a service pattern could work where the delegate types register themselves with ResolvingFileIO, then we can have the compile time check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into using Java SPI, but I don't think it is a good idea as it could cause problems with shadow jars. Also it isn't much different than this solution. We can't register with ResolvingFileIO from the delegate class as the delegate class might not be loaded at all yet.

One option, somewhat similar to #7976 would be to remove SupportsPrefixOperations and then provide a default implementation for bulk delete for the orphan file case so the delegate doesn't need the mixin interface. Though I was hoping we could have the prefix operations also as they won't be accessible at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for each of the delegate types to ensure they load. Though that doesn't prevent a new class from being added to the list that doesn't implement the interface.

testResolvingFileIO.setConf(new Configuration());
testResolvingFileIO.initialize(ImmutableMap.of());

String fileIONoMixins = mock(FileIO.class).getClass().getName();
doReturn(fileIONoMixins).when(testResolvingFileIO).implFromLocation(any());
assertThatThrownBy(() -> testResolvingFileIO.newInputFile("/file"))
.isInstanceOf(IllegalStateException.class);

String fileIOWithMixins =
mock(FileIO.class, withSettings().extraInterfaces(DelegateFileIO.class))
.getClass()
.getName();
doReturn(fileIOWithMixins).when(testResolvingFileIO).implFromLocation(any());
assertThatCode(() -> testResolvingFileIO.newInputFile("/file")).doesNotThrowAnyException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* <p>See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage
* Overview</a>
*/
public class GCSFileIO implements FileIO {
public class GCSFileIO implements FileIO { // FIXME: add DelegateFileIO
private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down
18 changes: 18 additions & 0 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class GCSFileIOTest {
Expand Down Expand Up @@ -121,4 +125,18 @@ public void testGCSFileIOJavaSerialization() throws IOException, ClassNotFoundEx

assertThat(testGCSFileIO.properties()).isEqualTo(roundTripSerializedFileIO.properties());
}

@Disabled // FIXME
@Test
public void testResolvingFileIOLoad() {
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
resolvingFileIO.setConf(new Configuration());
resolvingFileIO.initialize(ImmutableMap.of());
FileIO result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(resolvingFileIO)
.invoke("gs://foo/bar");
assertThat(result).isInstanceOf(GCSFileIO.class);
}
}