Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.io;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,15 +31,22 @@
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.PeekingIterator;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */
public class ResolvingFileIO implements FileIO, HadoopConfigurable {
/**
* FileIO implementation that uses location scheme to choose the correct FileIO implementation.
* Delegate FileIO implementations should support the mixin interfaces {@link
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to make this a requirement we should probably just add these as perquisites in the init code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me, I'll add that.

* SupportsPrefixOperations} and {@link SupportsBulkOperations}.
*/
public class ResolvingFileIO
implements FileIO, SupportsPrefixOperations, SupportsBulkOperations, HadoopConfigurable {
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
Expand Down Expand Up @@ -85,6 +93,44 @@ public void deleteFile(String location) {
io(location).deleteFile(location);
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
// peek at the first element to determine the type of FileIO
Iterator<String> originalIterator = pathsToDelete.iterator();
if (!originalIterator.hasNext()) {
return;
}

PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
FileIO fileIO = io(iterator.peek());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we change the function io to return a FileIO which supportsPrefixOperations and BulkOperations if that's a requirement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I made this change and removed the typecasts.

if (!(fileIO instanceof SupportsPrefixOperations)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check prefix, but then throw an exception warning we don't support bulk

throw new UnsupportedOperationException(
"FileIO doesn't support bulk operations: " + fileIO.getClass().getName());
}

((SupportsBulkOperations) fileIO).deleteFiles(() -> iterator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't check this cast

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also should probably pass through the original iterable. Just in case this is a closable iterable.

}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
FileIO fileIO = io(prefix);
if (!(fileIO instanceof SupportsPrefixOperations)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks if the delegate can't actually list prefix

throw new UnsupportedOperationException(
"FileIO doesn't support prefix operations: " + fileIO.getClass().getName());
}
return ((SupportsPrefixOperations) fileIO).listPrefix(prefix);
}

@Override
public void deletePrefix(String prefix) {
FileIO fileIO = io(prefix);
if (!(fileIO instanceof SupportsPrefixOperations)) {
throw new UnsupportedOperationException(
"FileIO doesn't support prefix operations: " + fileIO.getClass().getName());
}
((SupportsPrefixOperations) fileIO).deletePrefix(prefix);
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
Expand Down