Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

/**
* This interface is intended as an extension for FileIO implementations that support being a
* delegate target.
*/
public interface DelegateFileIO extends FileIO, SupportsPrefixOperations, SupportsBulkOperations {}
7 changes: 2 additions & 5 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CredentialSupplier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -75,8 +73,7 @@
* schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO
implements FileIO, SupportsBulkOperations, SupportsPrefixOperations, CredentialSupplier {
public class S3FileIO implements CredentialSupplier, DelegateFileIO {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down
16 changes: 16 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -361,6 +364,19 @@ public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundExc
.isEqualTo(testS3FileIO.properties());
}

@Test
public void testResolvingFileIOLoad() {
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
resolvingFileIO.setConf(new Configuration());
resolvingFileIO.initialize(ImmutableMap.of());
FileIO result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(resolvingFileIO)
.invoke("s3://foo/bar");
Assertions.assertThat(result).isInstanceOf(S3FileIO.class);
}

private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
Expand All @@ -47,8 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopFileIO
implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations {
public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO {

private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class);
private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism";
Expand Down
65 changes: 37 additions & 28 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */
public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations {
/**
* FileIO implementation that uses location scheme to choose the correct FileIO implementation.
* Delegate FileIO implementations must implement the {@link DelegateFileIO} mixin interface,
* otherwise initialization will fail.
*/
public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
private static final int BATCH_SIZE = 100_000;
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
Expand All @@ -55,7 +59,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulk
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL);

private final Map<String, FileIO> ioInstances = Maps.newConcurrentMap();
private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final transient StackTraceElement[] createStack;
private SerializableMap<String, String> properties;
Expand Down Expand Up @@ -95,25 +99,12 @@ public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailu
Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE)
.forEachRemaining(
partitioned -> {
Map<FileIO, List<String>> pathByFileIO =
Map<DelegateFileIO, List<String>> pathByFileIO =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just call the delegate directly now, I don't believe there is a need for this logic anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should still batch here or would we want to essentially use the same implementation you had?

// peek at the first element to determine the type of FileIO
    Iterator<String> originalIterator = pathsToDelete.iterator();
    if (!originalIterator.hasNext()) {
      return;
    }

    PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
    DelegateFileIO fileIO = io(iterator.peek());
    fileIO.deleteFiles(() -> iterator);

To me it seems like batching should probably be handled by the underlying FileIO implementation, but just wanted to see what others think?

Copy link
Contributor

@bryanck bryanck Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is probably better to keep what you have as there may be multiple FileIO types involved, and the peek method wasn’t ideal.

partitioned.stream().collect(Collectors.groupingBy(this::io));
for (Map.Entry<FileIO, List<String>> entries : pathByFileIO.entrySet()) {
FileIO io = entries.getKey();
for (Map.Entry<DelegateFileIO, List<String>> entries : pathByFileIO.entrySet()) {
DelegateFileIO io = entries.getKey();
List<String> filePaths = entries.getValue();
if (io instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io).deleteFiles(filePaths);
} else {
LOG.warn(
"IO {} does not support bulk operations. Using non-bulk deletes.",
io.getClass().getName());
Tasks.Builder<String> deleteTasks =
Tasks.foreach(filePaths)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
deleteTasks.run(io::deleteFile);
}
io.deleteFiles(filePaths);
}
});
}
Expand All @@ -133,12 +124,12 @@ public void initialize(Map<String, String> newProperties) {
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
List<FileIO> instances = Lists.newArrayList();
List<DelegateFileIO> instances = Lists.newArrayList();

instances.addAll(ioInstances.values());
ioInstances.clear();

for (FileIO io : instances) {
for (DelegateFileIO io : instances) {
io.close();
}
}
Expand All @@ -161,9 +152,9 @@ public Configuration getConf() {
}

@VisibleForTesting
FileIO io(String location) {
DelegateFileIO io(String location) {
String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
DelegateFileIO io = ioInstances.get(impl);
if (io != null) {
if (io instanceof HadoopConfigurable && ((HadoopConfigurable) io).getConf() == null) {
synchronized (io) {
Expand All @@ -181,13 +172,14 @@ FileIO io(String location) {
impl,
key -> {
Configuration conf = hadoopConf.get();
FileIO fileIO;

try {
Map<String, String> props = Maps.newHashMap(properties);
// ResolvingFileIO is keeping track of the creation stacktrace, so no need to do the
// same in S3FileIO.
props.put("init-creation-stacktrace", "false");
return CatalogUtil.loadFileIO(key, props, conf);
fileIO = CatalogUtil.loadFileIO(key, props, conf);
} catch (IllegalArgumentException e) {
if (key.equals(FALLBACK_IMPL)) {
// no implementation to fall back to, throw the exception
Expand All @@ -200,7 +192,7 @@ FileIO io(String location) {
FALLBACK_IMPL,
e);
try {
return CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
fileIO = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
} catch (IllegalArgumentException suppressed) {
LOG.warn(
"Failed to load FileIO implementation: {} (fallback)",
Expand All @@ -213,10 +205,17 @@ FileIO io(String location) {
}
}
}

Preconditions.checkState(
fileIO instanceof DelegateFileIO,
"FileIO does not implement DelegateFileIO: " + fileIO.getClass().getName());

return (DelegateFileIO) fileIO;
});
}

private static String implFromLocation(String location) {
@VisibleForTesting
String implFromLocation(String location) {
return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
}

Expand Down Expand Up @@ -252,4 +251,14 @@ protected void finalize() throws Throwable {
}
}
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
return io(prefix).listPrefix(prefix);
}

@Override
public void deletePrefix(String prefix) {
io(prefix).deletePrefix(prefix);
}
}
15 changes: 15 additions & 0 deletions core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
Expand Down Expand Up @@ -168,6 +170,19 @@ public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoun
.isEqualTo(testHadoopFileIO.properties());
}

@Test
public void testResolvingFileIOLoad() {
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
resolvingFileIO.setConf(new Configuration());
resolvingFileIO.initialize(ImmutableMap.of());
FileIO result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(resolvingFileIO)
.invoke("hdfs://foo/bar");
Assertions.assertThat(result).isInstanceOf(HadoopFileIO.class);
}

private List<Path> createRandomFiles(Path parent, int count) {
Vector<Path> paths = new Vector<>();
random
Expand Down
65 changes: 47 additions & 18 deletions core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.withSettings;

import java.io.IOException;
import java.util.List;
Expand All @@ -33,7 +37,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -130,27 +133,53 @@ public void resolveFileIOBulkDeletion() throws IOException {
}

@Test
public void resolveFileIONonBulkDeletion() {
public void delegateFileIOWithPrefixBasedSupport() throws IOException {
ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
String parentPath = "inmemory://foo.db/bar";
// configure delegation IO
InMemoryFileIO delegation = new InMemoryFileIO();
doReturn(delegation).when(resolvingFileIO).io(anyString());
// write
byte[] someData = "some data".getBytes();
List<String> randomFilePaths =
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.getLocal(hadoopConf);
Path parent = new Path(temp.toUri());
HadoopFileIO delegate = new HadoopFileIO(hadoopConf);
doReturn(delegate).when(resolvingFileIO).io(anyString());

List<Path> paths =
IntStream.range(1, 10)
.mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID())
.mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID()))
.collect(Collectors.toList());
for (String randomFilePath : randomFilePaths) {
delegation.addFile(randomFilePath, someData);
assertThat(delegation.fileExists(randomFilePath)).isTrue();
for (Path path : paths) {
fs.createNewFile(path);
assertThat(delegate.newInputFile(path.toString()).exists()).isTrue();
}
// non-bulk deletion
resolvingFileIO.deleteFiles(randomFilePaths);

for (String path : randomFilePaths) {
assertThat(delegation.fileExists(path)).isFalse();
}
paths.stream()
.map(Path::toString)
.forEach(
path -> {
// HadoopFileIO can only list prefixes that match the full path
assertThat(resolvingFileIO.listPrefix(path)).hasSize(1);
resolvingFileIO.deletePrefix(path);
assertThat(delegate.newInputFile(path).exists()).isFalse();
});
}

@Test
public void delegateFileIOWithAndWithoutMixins() {
ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
resolvingFileIO.setConf(new Configuration());
resolvingFileIO.initialize(ImmutableMap.of());

String fileIONoMixins = mock(FileIO.class).getClass().getName();
doReturn(fileIONoMixins).when(resolvingFileIO).implFromLocation(any());
assertThatThrownBy(() -> resolvingFileIO.newInputFile("/file"))
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith(
"FileIO does not implement DelegateFileIO: org.apache.iceberg.io.FileIO");

String fileIOWithMixins =
mock(FileIO.class, withSettings().extraInterfaces(DelegateFileIO.class))
.getClass()
.getName();
doReturn(fileIOWithMixins).when(resolvingFileIO).implFromLocation(any());
// being null is ok here as long as the code doesn't throw an exception
assertThat(resolvingFileIO.newInputFile("/file")).isNull();
}
}
Loading