diff --git a/api/src/main/java/org/apache/iceberg/io/FileRange.java b/api/src/main/java/org/apache/iceberg/io/FileRange.java new file mode 100644 index 000000000000..f6d5d9b41cca --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/FileRange.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FileRange { + private final CompletableFuture byteBuffer; + private final long offset; + private final int length; + + public FileRange(CompletableFuture byteBuffer, long offset, int length) + throws EOFException { + Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null"); + Preconditions.checkArgument( + length() >= 0, "Invalid length: %s in range (must be >= 0)", length); + Preconditions.checkArgument( + offset() >= 0, "Invalid offset: %s in range (must be >= 0)", offset); + + this.byteBuffer = byteBuffer; + this.offset = offset; + this.length = length; + } + + public CompletableFuture byteBuffer() { + return byteBuffer; + } + + public long offset() { + return offset; + } + + public int length() { + return length; + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index c2b113b2b27d..d860a5b76129 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -20,6 +20,12 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** * {@code RangeReadable} is an interface that allows for implementations of {@link InputFile} @@ -77,4 +83,61 @@ default void readFully(long position, byte[] buffer) throws IOException { default int readTail(byte[] buffer) throws IOException { return readTail(buffer, 0, buffer.length); } + + /** + * Read fully a list of file ranges asynchronously from this file. As a result of the call, each + * range will have FileRange.setData(CompletableFuture) called with a future that when complete + * will have a ByteBuffer with the data from the file's range. + * + *

The position returned by getPos() after readVectored() is undefined. + * + *

If a file is changed while the readVectored() operation is in progress, the output is + * undefined. Some ranges may have old data, some may have new and some may have both. + * + *

While a readVectored() operation is in progress, normal read api calls may block. + * + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @throws IOException any IOE. + * @throws IllegalArgumentException if any of ranges are invalid, or they overlap. + */ + default void readVectored(List ranges, IntFunction allocate) + throws IOException { + List validatedRanges = sortRanges(ranges); + for (FileRange range : validatedRanges) { + ByteBuffer buffer = allocate.apply(range.length()); + readFully(range.offset(), buffer.array()); + range.byteBuffer().complete(buffer); + } + } + + static List sortRanges(final List input) { + Preconditions.checkNotNull(input, "Input list can't be null"); + + final List sortedRanges; + + // 2 because the input size can be 0/1, and then we want to skip sorting. + if (input.size() < 2) { + sortedRanges = input; + } else { + sortedRanges = + input.stream() + .sorted(Comparator.comparingLong(FileRange::offset)) + .collect(Collectors.toList()); + FileRange prev = null; + for (final FileRange current : sortedRanges) { + if (prev != null) { + Preconditions.checkArgument( + current.offset() >= prev.offset() + prev.length(), + "Overlapping ranges %s and %s", + prev, + current); + } + + prev = current; + } + } + + return sortedRanges; + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150ff..5152beb87952 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1340,6 +1340,7 @@ public CloseableIterable build() { optionsBuilder.withDecryption(fileDecryptionProperties); } + optionsBuilder.withUseHadoopVectoredIo(true); ParquetReadOptions options = optionsBuilder.build(); NameMapping mapping; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index b36acdbd6f8d..e042504ca103 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -18,9 +18,15 @@ */ package org.apache.iceberg.parquet; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -29,11 +35,16 @@ import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.DelegatingInputStream; import org.apache.iceberg.io.DelegatingOutputStream; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; @@ -91,6 +102,11 @@ static SeekableInputStream stream(org.apache.iceberg.io.SeekableInputStream stre return HadoopStreams.wrap((FSDataInputStream) wrapped); } } + + if (stream instanceof RangeReadable) { + return new ParquetRangeReadableInputStreamAdapter(stream); + } + return new ParquetInputStreamAdapter(stream); } @@ -123,6 +139,73 @@ public void seek(long newPos) throws IOException { } } + @VisibleForTesting + static class ParquetRangeReadableInputStreamAdapter< + T extends org.apache.iceberg.io.SeekableInputStream & RangeReadable> + extends DelegatingSeekableInputStream implements RangeReadable { + private final T delegate; + + ParquetRangeReadableInputStreamAdapter(T delegate) { + super(delegate); + this.delegate = delegate; + } + + @Override + public long getPos() throws IOException { + return delegate.getPos(); + } + + @Override + public void seek(long newPos) throws IOException { + delegate.seek(newPos); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + delegate.readFully(position, buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return delegate.readTail(buffer, offset, length); + } + + @Override + public boolean readVectoredAvailable(ByteBufferAllocator allocate) { + return true; + } + + @Override + public void readVectored(List ranges, ByteBufferAllocator allocate) + throws IOException { + IntFunction delegateAllocate = (allocate::allocate); + List delegateRange = convertRanges(ranges); + delegate.readVectored(delegateRange, delegateAllocate); + } + + private static List convertRanges(List ranges) { + return ranges.stream() + .map( + parquetFileRange -> { + CompletableFuture future = new CompletableFuture<>(); + parquetFileRange.setDataReadFuture(future); + try { + return new FileRange( + parquetFileRange.getDataReadFuture(), + parquetFileRange.getOffset(), + parquetFileRange.getLength()); + } catch (EOFException e) { + throw new RuntimeIOException( + e, + "Failed to create range file for offset: %s and length: %s", + parquetFileRange.getOffset(), + parquetFileRange.getLength()); + } + }) + .collect(Collectors.toList()); + } + } + private static class ParquetOutputStreamAdapter extends DelegatingPositionOutputStream { private final org.apache.iceberg.io.PositionOutputStream delegate; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java new file mode 100644 index 000000000000..fbf26a48eb68 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.function.IntFunction; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.io.ParquetFileRange; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestParquetRangeReadableInputStreamAdapter { + + @Test + public void testRangeReadableAdapterReadVectoredAvailable() { + MockRangeReadableStream mockStream = new MockRangeReadableStream(); + ParquetIO.ParquetRangeReadableInputStreamAdapter adapter = + new ParquetIO.ParquetRangeReadableInputStreamAdapter<>(mockStream); + ByteBufferAllocator allocator = Mockito.mock(ByteBufferAllocator.class); + + boolean result = adapter.readVectoredAvailable(allocator); + + assertThat(result).isTrue(); + } + + @Test + public void testRangeReadableAdapterReadVectored() throws IOException { + int length1 = 100; + int length2 = 50; + MockRangeReadableStream mockStream = new MockRangeReadableStream(); + ParquetIO.ParquetRangeReadableInputStreamAdapter adapter = + new ParquetIO.ParquetRangeReadableInputStreamAdapter<>(mockStream); + + ByteBufferAllocator allocator = Mockito.mock(ByteBufferAllocator.class); + when(allocator.allocate(length1)).thenReturn(ByteBuffer.wrap(new byte[length1])); + when(allocator.allocate(length2)).thenReturn(ByteBuffer.wrap(new byte[length2])); + + ParquetFileRange range1 = new ParquetFileRange(0, length1); + ParquetFileRange range2 = new ParquetFileRange(200, length2); + List ranges = Arrays.asList(range1, range2); + + adapter.readVectored(ranges, allocator); + + verify(allocator).allocate(length1); + verify(allocator).allocate(length2); + + assertThat(range1.getDataReadFuture().isDone()).isTrue(); + assertThat(range2.getDataReadFuture().isDone()).isTrue(); + + ByteBuffer buffer1 = range1.getDataReadFuture().join(); + ByteBuffer buffer2 = range2.getDataReadFuture().join(); + + // Verify data content from byte arrays + byte[] array1 = new byte[length1]; + byte[] array2 = new byte[length2]; + buffer1.get(array1); + buffer2.get(array2); + + byte[] expected1 = new byte[length1]; + byte[] expected2 = new byte[length2]; + + for (int i = 0; i < length1; i++) { + expected1[i] = (byte) i; + } + + for (int i = 0; i < length2; i++) { + expected2[i] = (byte) (200 + i); + } + + assertThat(array1).isEqualTo(expected1); + assertThat(array2).isEqualTo(expected2); + } + + private static class MockRangeReadableStream extends SeekableInputStream + implements RangeReadable { + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void seek(long newPos) throws IOException { + // no-op + } + + @Override + public int read() throws IOException { + return -1; + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + // no-op + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return -1; + } + + @Override + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + for (FileRange range : ranges) { + try { + ByteBuffer buffer = allocate.apply(range.length()); + + for (int i = 0; i < range.length(); i++) { + buffer.put((byte) (range.offset() + i)); + } + + buffer.flip(); + range.byteBuffer().complete(buffer); + } catch (Exception e) { + range.byteBuffer().completeExceptionally(e); + } + } + } + } +}