From 8f05471c4e31c1cde875fa84038887ca41c4ee5f Mon Sep 17 00:00:00 2001 From: stubz151 Date: Fri, 5 Sep 2025 14:35:09 +0100 Subject: [PATCH 1/3] core: Adding read vector to range readable interface and adding mapper to parquet stream. --- .../java/org/apache/iceberg/io/FileRange.java | 46 +++++++ .../org/apache/iceberg/io/RangeReadable.java | 41 ++++++ .../iceberg/util/VectoredReadUtils.java | 120 ++++++++++++++++++ .../org/apache/iceberg/parquet/Parquet.java | 1 + .../org/apache/iceberg/parquet/ParquetIO.java | 70 ++++++++++ 5 files changed, 278 insertions(+) create mode 100644 api/src/main/java/org/apache/iceberg/io/FileRange.java create mode 100644 api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java diff --git a/api/src/main/java/org/apache/iceberg/io/FileRange.java b/api/src/main/java/org/apache/iceberg/io/FileRange.java new file mode 100644 index 000000000000..4fcaece0245a --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/FileRange.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +public class FileRange { + private final CompletableFuture byteBuffer; + private final long offset; + private final int length; + + public FileRange(CompletableFuture byteBuffer, long offset, int length) { + this.byteBuffer = byteBuffer; + this.offset = offset; + this.length = length; + } + + public CompletableFuture byteBuffer() { + return byteBuffer; + } + + public long offset() { + return offset; + } + + public int length() { + return length; + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index c2b113b2b27d..d734e92a2ce6 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -20,6 +20,10 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; +import org.apache.iceberg.util.VectoredReadUtils; /** * {@code RangeReadable} is an interface that allows for implementations of {@link InputFile} @@ -77,4 +81,41 @@ default void readFully(long position, byte[] buffer) throws IOException { default int readTail(byte[] buffer) throws IOException { return readTail(buffer, 0, buffer.length); } + + /** + * Is the {@link #readVectored(List, IntFunction)} method available? + * + * @param allocate the allocator to use for allocating ByteBuffers + * @return True if the operation is considered available for this allocator. + */ + default boolean readVectoredAvailable(IntFunction allocate) { + return true; + } + + /** + * Read fully a list of file ranges asynchronously from this file. As a result of the call, each + * range will have FileRange.setData(CompletableFuture) called with a future that when complete + * will have a ByteBuffer with the data from the file's range. + * + *

The position returned by getPos() after readVectored() is undefined. + * + *

If a file is changed while the readVectored() operation is in progress, the output is + * undefined. Some ranges may have old data, some may have new and some may have both. + * + *

While a readVectored() operation is in progress, normal read api calls may block. + * + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @throws IOException any IOE. + * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + */ + default void readVectored(List ranges, IntFunction allocate) + throws IOException { + List validatedRanges = VectoredReadUtils.validateAndSortRanges(ranges); + for (FileRange range : validatedRanges) { + ByteBuffer buffer = allocate.apply(range.length()); + readFully(range.offset(), buffer.array()); + range.byteBuffer().complete(buffer); + } + } } diff --git a/api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java b/api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java new file mode 100644 index 000000000000..41abf2d27e4e --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.io.EOFException; +import java.util.Comparator; +import java.util.List; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utils class for vectoredReads, to help with things like range validation. Most of the code in + * this class is written by @mukundthakur, and taken from + * /hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java (thank you!). + */ +public final class VectoredReadUtils { + private VectoredReadUtils() {} + + private static final Logger LOG = LoggerFactory.getLogger(VectoredReadUtils.class); + + /** + * Validate a list of ranges (including overlapping checks) and return the sorted list. + * + *

Two ranges overlap when the start offset of second is less than the end offset of first. End + * offset is calculated as start offset + length. + * + * @param input input list + * @return a new sorted list. + * @throws IllegalArgumentException if there are overlapping ranges or a range element is invalid + * (other than with negative offset) + * @throws EOFException if the last range extends beyond the end of the file supplied or a range + * offset is negative + */ + public static List validateAndSortRanges(final List input) + throws EOFException { + + Preconditions.checkNotNull(input, "Null input list"); + + if (input.isEmpty()) { + // this may seem a pathological case, but it was valid + // before and somehow Spark can call it through parquet. + LOG.debug("Empty input list"); + return input; + } + + final List sortedRanges; + + if (input.size() == 1) { + validateRangeRequest(input.get(0)); + sortedRanges = input; + } else { + sortedRanges = sortRangeList(input); + FileRange prev = null; + for (final FileRange current : sortedRanges) { + validateRangeRequest(current); + if (prev != null) { + Preconditions.checkArgument( + current.offset() >= prev.offset() + prev.length(), + "Overlapping ranges %s and %s", + prev, + current); + } + prev = current; + } + } + + return sortedRanges; + } + + /** + * Validate a single range. + * + * @param range range to validate. + * @return the range. + * @throws IllegalArgumentException the range length is negative or other invalid condition is met + * other than the those which raise EOFException or NullPointerException. + * @throws EOFException the range offset is negative + * @throws NullPointerException if the range is null. + */ + public static FileRange validateRangeRequest(FileRange range) throws EOFException { + Preconditions.checkNotNull(range, "range is null"); + + Preconditions.checkArgument(range.length() >= 0, "length is negative in %s", range); + if (range.offset() < 0) { + throw new EOFException("position is negative in range " + range); + } + return range; + } + + /** + * Sort the input ranges by offset; no validation is done. + * + * @param input input ranges. + * @return a new list of the ranges, sorted by offset. + */ + public static List sortRangeList(List input) { + final List l = Lists.newArrayList(input); + l.sort(Comparator.comparingLong(FileRange::offset)); + return l; + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150ff..5152beb87952 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1340,6 +1340,7 @@ public CloseableIterable build() { optionsBuilder.withDecryption(fileDecryptionProperties); } + optionsBuilder.withUseHadoopVectoredIo(true); ParquetReadOptions options = optionsBuilder.build(); NameMapping mapping; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index b36acdbd6f8d..af6d451c3803 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -21,6 +21,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -29,11 +34,15 @@ import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.DelegatingInputStream; import org.apache.iceberg.io.DelegatingOutputStream; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; @@ -91,6 +100,9 @@ static SeekableInputStream stream(org.apache.iceberg.io.SeekableInputStream stre return HadoopStreams.wrap((FSDataInputStream) wrapped); } } + if (stream instanceof RangeReadable) { + return new ParquetRangeReadableInputStreamAdapter(stream); + } return new ParquetInputStreamAdapter(stream); } @@ -123,6 +135,64 @@ public void seek(long newPos) throws IOException { } } + private static class ParquetRangeReadableInputStreamAdapter< + T extends org.apache.iceberg.io.SeekableInputStream & RangeReadable> + extends DelegatingSeekableInputStream implements RangeReadable { + private final T delegate; + + private ParquetRangeReadableInputStreamAdapter(T delegate) { + super(delegate); + this.delegate = delegate; + } + + @Override + public long getPos() throws IOException { + return delegate.getPos(); + } + + @Override + public void seek(long newPos) throws IOException { + delegate.seek(newPos); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + delegate.readFully(position, buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return delegate.readTail(buffer, offset, length); + } + + @Override + public boolean readVectoredAvailable(ByteBufferAllocator allocate) { + return true; + } + + @Override + public void readVectored(List ranges, ByteBufferAllocator allocate) + throws IOException { + IntFunction delegateAllocate = (allocate::allocate); + List delegateRange = convertRanges(ranges); + delegate.readVectored(delegateRange, delegateAllocate); + } + + private static List convertRanges(List ranges) { + return ranges.stream() + .map( + parquetFileRange -> { + CompletableFuture result = new CompletableFuture<>(); + parquetFileRange.setDataReadFuture(result); + return new FileRange( + parquetFileRange.getDataReadFuture(), + parquetFileRange.getOffset(), + parquetFileRange.getLength()); + }) + .collect(Collectors.toList()); + } + } + private static class ParquetOutputStreamAdapter extends DelegatingPositionOutputStream { private final org.apache.iceberg.io.PositionOutputStream delegate; From 65ce5a7c6080577b4638210af1417eac74279963 Mon Sep 17 00:00:00 2001 From: stubz151 Date: Fri, 12 Sep 2025 16:47:34 +0100 Subject: [PATCH 2/3] fix: addressing comments on PR --- .../java/org/apache/iceberg/io/FileRange.java | 11 +- .../org/apache/iceberg/io/RangeReadable.java | 54 +++++-- .../iceberg/util/VectoredReadUtils.java | 120 ---------------- .../apache/iceberg/aws/s3/TestS3FileIO.java | 133 ++++++++++++++++++ .../org/apache/iceberg/parquet/ParquetIO.java | 21 ++- 5 files changed, 200 insertions(+), 139 deletions(-) delete mode 100644 api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java diff --git a/api/src/main/java/org/apache/iceberg/io/FileRange.java b/api/src/main/java/org/apache/iceberg/io/FileRange.java index 4fcaece0245a..9dbedf7f6570 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileRange.java +++ b/api/src/main/java/org/apache/iceberg/io/FileRange.java @@ -18,15 +18,24 @@ */ package org.apache.iceberg.io; +import java.io.EOFException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class FileRange { private final CompletableFuture byteBuffer; private final long offset; private final int length; - public FileRange(CompletableFuture byteBuffer, long offset, int length) { + public FileRange(CompletableFuture byteBuffer, long offset, int length) + throws EOFException { + Preconditions.checkNotNull(offset, "offset is null"); + Preconditions.checkNotNull(length, "length is null"); + Preconditions.checkArgument(length() >= 0, "length %s is negative ", length); + if (offset < 0) { + throw new EOFException("position is negative in range: " + offset); + } this.byteBuffer = byteBuffer; this.offset = offset; this.length = length; diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index d734e92a2ce6..ecc0e84a2d29 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -21,9 +21,11 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.List; import java.util.function.IntFunction; -import org.apache.iceberg.util.VectoredReadUtils; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** * {@code RangeReadable} is an interface that allows for implementations of {@link InputFile} @@ -82,16 +84,6 @@ default int readTail(byte[] buffer) throws IOException { return readTail(buffer, 0, buffer.length); } - /** - * Is the {@link #readVectored(List, IntFunction)} method available? - * - * @param allocate the allocator to use for allocating ByteBuffers - * @return True if the operation is considered available for this allocator. - */ - default boolean readVectoredAvailable(IntFunction allocate) { - return true; - } - /** * Read fully a list of file ranges asynchronously from this file. As a result of the call, each * range will have FileRange.setData(CompletableFuture) called with a future that when complete @@ -111,11 +103,49 @@ default boolean readVectoredAvailable(IntFunction allocate) { */ default void readVectored(List ranges, IntFunction allocate) throws IOException { - List validatedRanges = VectoredReadUtils.validateAndSortRanges(ranges); + List validatedRanges = sortRanges(ranges); for (FileRange range : validatedRanges) { ByteBuffer buffer = allocate.apply(range.length()); readFully(range.offset(), buffer.array()); range.byteBuffer().complete(buffer); } } + + static List sortRanges(final List input) { + Preconditions.checkNotNull(input, "Null input list"); + + final List sortedRanges; + + // 2 because the input size can be 0/1, and then we want to skip sorting. + if (input.size() < 2) { + sortedRanges = input; + } else { + sortedRanges = sortRangeList(input); + FileRange prev = null; + for (final FileRange current : sortedRanges) { + if (prev != null) { + Preconditions.checkArgument( + current.offset() >= prev.offset() + prev.length(), + "Overlapping ranges %s and %s", + prev, + current); + } + prev = current; + } + } + + return sortedRanges; + } + + /** + * Sort the input ranges by offset; no validation is done. + * + * @param input input ranges. + * @return a new list of the ranges, sorted by offset. + */ + static List sortRangeList(List input) { + final List l = Lists.newArrayList(input); + l.sort(Comparator.comparingLong(FileRange::offset)); + return l; + } } diff --git a/api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java b/api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java deleted file mode 100644 index 41abf2d27e4e..000000000000 --- a/api/src/main/java/org/apache/iceberg/util/VectoredReadUtils.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.util; - -import java.io.EOFException; -import java.util.Comparator; -import java.util.List; -import org.apache.iceberg.io.FileRange; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utils class for vectoredReads, to help with things like range validation. Most of the code in - * this class is written by @mukundthakur, and taken from - * /hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java (thank you!). - */ -public final class VectoredReadUtils { - private VectoredReadUtils() {} - - private static final Logger LOG = LoggerFactory.getLogger(VectoredReadUtils.class); - - /** - * Validate a list of ranges (including overlapping checks) and return the sorted list. - * - *

Two ranges overlap when the start offset of second is less than the end offset of first. End - * offset is calculated as start offset + length. - * - * @param input input list - * @return a new sorted list. - * @throws IllegalArgumentException if there are overlapping ranges or a range element is invalid - * (other than with negative offset) - * @throws EOFException if the last range extends beyond the end of the file supplied or a range - * offset is negative - */ - public static List validateAndSortRanges(final List input) - throws EOFException { - - Preconditions.checkNotNull(input, "Null input list"); - - if (input.isEmpty()) { - // this may seem a pathological case, but it was valid - // before and somehow Spark can call it through parquet. - LOG.debug("Empty input list"); - return input; - } - - final List sortedRanges; - - if (input.size() == 1) { - validateRangeRequest(input.get(0)); - sortedRanges = input; - } else { - sortedRanges = sortRangeList(input); - FileRange prev = null; - for (final FileRange current : sortedRanges) { - validateRangeRequest(current); - if (prev != null) { - Preconditions.checkArgument( - current.offset() >= prev.offset() + prev.length(), - "Overlapping ranges %s and %s", - prev, - current); - } - prev = current; - } - } - - return sortedRanges; - } - - /** - * Validate a single range. - * - * @param range range to validate. - * @return the range. - * @throws IllegalArgumentException the range length is negative or other invalid condition is met - * other than the those which raise EOFException or NullPointerException. - * @throws EOFException the range offset is negative - * @throws NullPointerException if the range is null. - */ - public static FileRange validateRangeRequest(FileRange range) throws EOFException { - Preconditions.checkNotNull(range, "range is null"); - - Preconditions.checkArgument(range.length() >= 0, "length is negative in %s", range); - if (range.offset() < 0) { - throw new EOFException("position is negative in range " + range); - } - return range; - } - - /** - * Sort the input ranges by offset; no validation is done. - * - * @param input input ranges. - * @return a new list of the ranges, sorted by offset. - */ - public static List sortRangeList(List input) { - final List l = Lists.newArrayList(input); - l.sort(Comparator.comparingLong(FileRange::offset)); - return l; - } -} diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index ebee07e53e4c..03f4e12e7870 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -41,6 +41,8 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -66,9 +68,11 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileIOParser; import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.FileRange; import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.jdbc.JdbcCatalog; @@ -966,6 +970,135 @@ public void noStorageCredentialConfiguredWithoutCredentialsInProperties() { .hasMessageContaining("Unable to load credentials from any of the providers"); } + @Test + public void testVectoredRead() throws Exception { + String location = "s3://bucket/path/to/vectored-read.dat"; + int dataSize = 1024 * 1024; + + byte[] expected = new byte[dataSize]; + random.nextBytes(expected); + + InputFile inputFile = s3FileIO.newInputFile(location); + assertThat(inputFile.exists()).isFalse(); + + OutputFile out = s3FileIO.newOutputFile(location); + try (OutputStream os = out.createOrOverwrite()) { + IOUtil.writeFully(os, ByteBuffer.wrap(expected)); + } + + try (InputStream inputStream = inputFile.newStream()) { + assertThat(inputStream instanceof RangeReadable); + RangeReadable in = (RangeReadable) inputStream; + + IntFunction allocate = ByteBuffer::allocate; + + List ranges = Lists.newArrayList(); + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + CompletableFuture future3 = new CompletableFuture<>(); + + // First range: first 1024 bytes + int range1Offset = 0; + int range1Length = 1024; + ranges.add(new FileRange(future1, range1Offset, range1Length)); + + // Second range: middle 2048 bytes + int range2Offset = dataSize / 2; + int range2Length = 2048; + ranges.add(new FileRange(future2, range2Offset, range2Length)); + + // Third range: last 1024 bytes + int range3Offset = dataSize - 1024; + int range3Length = 1024; + ranges.add(new FileRange(future3, range3Offset, range3Length)); + + in.readVectored(ranges, allocate); + + ByteBuffer buffer1 = future1.get(); + ByteBuffer buffer2 = future2.get(); + ByteBuffer buffer3 = future3.get(); + + assertThat(future1.isDone()).isTrue(); + assertThat(future2.isDone()).isTrue(); + assertThat(future3.isDone()).isTrue(); + + assertThat(buffer1.limit()).isEqualTo(range1Length); + assertThat(buffer2.limit()).isEqualTo(range2Length); + assertThat(buffer3.limit()).isEqualTo(range3Length); + + byte[] range1Data = new byte[range1Length]; + byte[] range2Data = new byte[range2Length]; + byte[] range3Data = new byte[range3Length]; + + buffer1.get(range1Data); + buffer2.get(range2Data); + buffer3.get(range3Data); + } + } + + @Test + public void testVectoredReadWithNonContinuousRanges() throws Exception { + + String location = "s3://bucket/path/to/vectored-read-overlapping.dat"; + int dataSize = 1024 * 1024; + + byte[] expected = new byte[dataSize]; + random.nextBytes(expected); + + InputFile inputFile = s3FileIO.newInputFile(location); + assertThat(inputFile.exists()).isFalse(); + + OutputFile out = s3FileIO.newOutputFile(location); + try (OutputStream os = out.createOrOverwrite()) { + IOUtil.writeFully(os, ByteBuffer.wrap(expected)); + } + + try (InputStream inputStream = inputFile.newStream()) { + assertThat(inputStream instanceof RangeReadable); + RangeReadable in = (RangeReadable) inputStream; + List ranges = Lists.newArrayList(); + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // First range: 0-1024 + int range1Offset = 0; + int range1Length = 1024; + ranges.add(new FileRange(future1, range1Offset, range1Length)); + + // Second range: 2000-3400 + int range2Offset = 2000; + int range2Length = 3400; + ranges.add(new FileRange(future2, range2Offset, range2Length)); + + // Call readVectored + IntFunction allocate = ByteBuffer::allocate; + in.readVectored(ranges, allocate); + + // Verify the buffers have the expected content + ByteBuffer buffer1 = future1.get(); + ByteBuffer buffer2 = future2.get(); + + // Verify the futures were completed + assertThat(future1.isDone()).isTrue(); + assertThat(future2.isDone()).isTrue(); + + assertThat(buffer1.limit()).isEqualTo(range1Length); + assertThat(buffer2.limit()).isEqualTo(range2Length); + + // Verify the buffer content matches the original data + byte[] range1Data = new byte[range1Length]; + byte[] range2Data = new byte[range2Length]; + + buffer1.get(range1Data); + buffer2.get(range2Data); + + assertThat(range1Data) + .isEqualTo(Arrays.copyOfRange(expected, range1Offset, range1Offset + range1Length)); + assertThat(range2Data) + .isEqualTo(Arrays.copyOfRange(expected, range2Offset, range2Offset + range2Length)); + } + } + private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index af6d451c3803..8b9a77b59caa 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.parquet; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -182,12 +183,20 @@ private static List convertRanges(List ranges) { return ranges.stream() .map( parquetFileRange -> { - CompletableFuture result = new CompletableFuture<>(); - parquetFileRange.setDataReadFuture(result); - return new FileRange( - parquetFileRange.getDataReadFuture(), - parquetFileRange.getOffset(), - parquetFileRange.getLength()); + CompletableFuture future = new CompletableFuture<>(); + parquetFileRange.setDataReadFuture(future); + try { + return new FileRange( + parquetFileRange.getDataReadFuture(), + parquetFileRange.getOffset(), + parquetFileRange.getLength()); + } catch (EOFException e) { + throw new RuntimeIOException( + e, + "Failed to create range file for offset: %s and length: %s", + parquetFileRange.getOffset(), + parquetFileRange.getLength()); + } }) .collect(Collectors.toList()); } From 180613d0888bd4c232e62e1dada456350bb2606b Mon Sep 17 00:00:00 2001 From: stubz151 Date: Tue, 23 Sep 2025 17:53:22 +0100 Subject: [PATCH 3/3] responding to pr feedback --- .../java/org/apache/iceberg/io/FileRange.java | 12 +- .../org/apache/iceberg/io/RangeReadable.java | 24 +-- .../apache/iceberg/aws/s3/TestS3FileIO.java | 133 ---------------- .../org/apache/iceberg/parquet/ParquetIO.java | 8 +- ...arquetRangeReadableInputStreamAdapter.java | 147 ++++++++++++++++++ 5 files changed, 167 insertions(+), 157 deletions(-) create mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java diff --git a/api/src/main/java/org/apache/iceberg/io/FileRange.java b/api/src/main/java/org/apache/iceberg/io/FileRange.java index 9dbedf7f6570..f6d5d9b41cca 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileRange.java +++ b/api/src/main/java/org/apache/iceberg/io/FileRange.java @@ -30,12 +30,12 @@ public class FileRange { public FileRange(CompletableFuture byteBuffer, long offset, int length) throws EOFException { - Preconditions.checkNotNull(offset, "offset is null"); - Preconditions.checkNotNull(length, "length is null"); - Preconditions.checkArgument(length() >= 0, "length %s is negative ", length); - if (offset < 0) { - throw new EOFException("position is negative in range: " + offset); - } + Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null"); + Preconditions.checkArgument( + length() >= 0, "Invalid length: %s in range (must be >= 0)", length); + Preconditions.checkArgument( + offset() >= 0, "Invalid offset: %s in range (must be >= 0)", offset); + this.byteBuffer = byteBuffer; this.offset = offset; this.length = length; diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java index ecc0e84a2d29..d860a5b76129 100644 --- a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -24,8 +24,8 @@ import java.util.Comparator; import java.util.List; import java.util.function.IntFunction; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** * {@code RangeReadable} is an interface that allows for implementations of {@link InputFile} @@ -99,7 +99,7 @@ default int readTail(byte[] buffer) throws IOException { * @param ranges the byte ranges to read * @param allocate the function to allocate ByteBuffer * @throws IOException any IOE. - * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + * @throws IllegalArgumentException if any of ranges are invalid, or they overlap. */ default void readVectored(List ranges, IntFunction allocate) throws IOException { @@ -112,7 +112,7 @@ default void readVectored(List ranges, IntFunction alloca } static List sortRanges(final List input) { - Preconditions.checkNotNull(input, "Null input list"); + Preconditions.checkNotNull(input, "Input list can't be null"); final List sortedRanges; @@ -120,7 +120,10 @@ static List sortRanges(final List input) { if (input.size() < 2) { sortedRanges = input; } else { - sortedRanges = sortRangeList(input); + sortedRanges = + input.stream() + .sorted(Comparator.comparingLong(FileRange::offset)) + .collect(Collectors.toList()); FileRange prev = null; for (final FileRange current : sortedRanges) { if (prev != null) { @@ -130,22 +133,11 @@ static List sortRanges(final List input) { prev, current); } + prev = current; } } return sortedRanges; } - - /** - * Sort the input ranges by offset; no validation is done. - * - * @param input input ranges. - * @return a new list of the ranges, sorted by offset. - */ - static List sortRangeList(List input) { - final List l = Lists.newArrayList(input); - l.sort(Comparator.comparingLong(FileRange::offset)); - return l; - } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index 03f4e12e7870..ebee07e53e4c 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -41,8 +41,6 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.function.IntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -68,11 +66,9 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileIOParser; import org.apache.iceberg.io.FileInfo; -import org.apache.iceberg.io.FileRange; import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.jdbc.JdbcCatalog; @@ -970,135 +966,6 @@ public void noStorageCredentialConfiguredWithoutCredentialsInProperties() { .hasMessageContaining("Unable to load credentials from any of the providers"); } - @Test - public void testVectoredRead() throws Exception { - String location = "s3://bucket/path/to/vectored-read.dat"; - int dataSize = 1024 * 1024; - - byte[] expected = new byte[dataSize]; - random.nextBytes(expected); - - InputFile inputFile = s3FileIO.newInputFile(location); - assertThat(inputFile.exists()).isFalse(); - - OutputFile out = s3FileIO.newOutputFile(location); - try (OutputStream os = out.createOrOverwrite()) { - IOUtil.writeFully(os, ByteBuffer.wrap(expected)); - } - - try (InputStream inputStream = inputFile.newStream()) { - assertThat(inputStream instanceof RangeReadable); - RangeReadable in = (RangeReadable) inputStream; - - IntFunction allocate = ByteBuffer::allocate; - - List ranges = Lists.newArrayList(); - CompletableFuture future1 = new CompletableFuture<>(); - CompletableFuture future2 = new CompletableFuture<>(); - CompletableFuture future3 = new CompletableFuture<>(); - - // First range: first 1024 bytes - int range1Offset = 0; - int range1Length = 1024; - ranges.add(new FileRange(future1, range1Offset, range1Length)); - - // Second range: middle 2048 bytes - int range2Offset = dataSize / 2; - int range2Length = 2048; - ranges.add(new FileRange(future2, range2Offset, range2Length)); - - // Third range: last 1024 bytes - int range3Offset = dataSize - 1024; - int range3Length = 1024; - ranges.add(new FileRange(future3, range3Offset, range3Length)); - - in.readVectored(ranges, allocate); - - ByteBuffer buffer1 = future1.get(); - ByteBuffer buffer2 = future2.get(); - ByteBuffer buffer3 = future3.get(); - - assertThat(future1.isDone()).isTrue(); - assertThat(future2.isDone()).isTrue(); - assertThat(future3.isDone()).isTrue(); - - assertThat(buffer1.limit()).isEqualTo(range1Length); - assertThat(buffer2.limit()).isEqualTo(range2Length); - assertThat(buffer3.limit()).isEqualTo(range3Length); - - byte[] range1Data = new byte[range1Length]; - byte[] range2Data = new byte[range2Length]; - byte[] range3Data = new byte[range3Length]; - - buffer1.get(range1Data); - buffer2.get(range2Data); - buffer3.get(range3Data); - } - } - - @Test - public void testVectoredReadWithNonContinuousRanges() throws Exception { - - String location = "s3://bucket/path/to/vectored-read-overlapping.dat"; - int dataSize = 1024 * 1024; - - byte[] expected = new byte[dataSize]; - random.nextBytes(expected); - - InputFile inputFile = s3FileIO.newInputFile(location); - assertThat(inputFile.exists()).isFalse(); - - OutputFile out = s3FileIO.newOutputFile(location); - try (OutputStream os = out.createOrOverwrite()) { - IOUtil.writeFully(os, ByteBuffer.wrap(expected)); - } - - try (InputStream inputStream = inputFile.newStream()) { - assertThat(inputStream instanceof RangeReadable); - RangeReadable in = (RangeReadable) inputStream; - List ranges = Lists.newArrayList(); - CompletableFuture future1 = new CompletableFuture<>(); - CompletableFuture future2 = new CompletableFuture<>(); - - // First range: 0-1024 - int range1Offset = 0; - int range1Length = 1024; - ranges.add(new FileRange(future1, range1Offset, range1Length)); - - // Second range: 2000-3400 - int range2Offset = 2000; - int range2Length = 3400; - ranges.add(new FileRange(future2, range2Offset, range2Length)); - - // Call readVectored - IntFunction allocate = ByteBuffer::allocate; - in.readVectored(ranges, allocate); - - // Verify the buffers have the expected content - ByteBuffer buffer1 = future1.get(); - ByteBuffer buffer2 = future2.get(); - - // Verify the futures were completed - assertThat(future1.isDone()).isTrue(); - assertThat(future2.isDone()).isTrue(); - - assertThat(buffer1.limit()).isEqualTo(range1Length); - assertThat(buffer2.limit()).isEqualTo(range2Length); - - // Verify the buffer content matches the original data - byte[] range1Data = new byte[range1Length]; - byte[] range2Data = new byte[range2Length]; - - buffer1.get(range1Data); - buffer2.get(range2Data); - - assertThat(range1Data) - .isEqualTo(Arrays.copyOfRange(expected, range1Offset, range1Offset + range1Length)); - assertThat(range2Data) - .isEqualTo(Arrays.copyOfRange(expected, range2Offset, range2Offset + range2Length)); - } - } - private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index 8b9a77b59caa..e042504ca103 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -37,6 +37,7 @@ import org.apache.iceberg.io.DelegatingOutputStream; import org.apache.iceberg.io.FileRange; import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.DelegatingPositionOutputStream; @@ -101,9 +102,11 @@ static SeekableInputStream stream(org.apache.iceberg.io.SeekableInputStream stre return HadoopStreams.wrap((FSDataInputStream) wrapped); } } + if (stream instanceof RangeReadable) { return new ParquetRangeReadableInputStreamAdapter(stream); } + return new ParquetInputStreamAdapter(stream); } @@ -136,12 +139,13 @@ public void seek(long newPos) throws IOException { } } - private static class ParquetRangeReadableInputStreamAdapter< + @VisibleForTesting + static class ParquetRangeReadableInputStreamAdapter< T extends org.apache.iceberg.io.SeekableInputStream & RangeReadable> extends DelegatingSeekableInputStream implements RangeReadable { private final T delegate; - private ParquetRangeReadableInputStreamAdapter(T delegate) { + ParquetRangeReadableInputStreamAdapter(T delegate) { super(delegate); this.delegate = delegate; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java new file mode 100644 index 000000000000..fbf26a48eb68 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetRangeReadableInputStreamAdapter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.function.IntFunction; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.io.ParquetFileRange; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestParquetRangeReadableInputStreamAdapter { + + @Test + public void testRangeReadableAdapterReadVectoredAvailable() { + MockRangeReadableStream mockStream = new MockRangeReadableStream(); + ParquetIO.ParquetRangeReadableInputStreamAdapter adapter = + new ParquetIO.ParquetRangeReadableInputStreamAdapter<>(mockStream); + ByteBufferAllocator allocator = Mockito.mock(ByteBufferAllocator.class); + + boolean result = adapter.readVectoredAvailable(allocator); + + assertThat(result).isTrue(); + } + + @Test + public void testRangeReadableAdapterReadVectored() throws IOException { + int length1 = 100; + int length2 = 50; + MockRangeReadableStream mockStream = new MockRangeReadableStream(); + ParquetIO.ParquetRangeReadableInputStreamAdapter adapter = + new ParquetIO.ParquetRangeReadableInputStreamAdapter<>(mockStream); + + ByteBufferAllocator allocator = Mockito.mock(ByteBufferAllocator.class); + when(allocator.allocate(length1)).thenReturn(ByteBuffer.wrap(new byte[length1])); + when(allocator.allocate(length2)).thenReturn(ByteBuffer.wrap(new byte[length2])); + + ParquetFileRange range1 = new ParquetFileRange(0, length1); + ParquetFileRange range2 = new ParquetFileRange(200, length2); + List ranges = Arrays.asList(range1, range2); + + adapter.readVectored(ranges, allocator); + + verify(allocator).allocate(length1); + verify(allocator).allocate(length2); + + assertThat(range1.getDataReadFuture().isDone()).isTrue(); + assertThat(range2.getDataReadFuture().isDone()).isTrue(); + + ByteBuffer buffer1 = range1.getDataReadFuture().join(); + ByteBuffer buffer2 = range2.getDataReadFuture().join(); + + // Verify data content from byte arrays + byte[] array1 = new byte[length1]; + byte[] array2 = new byte[length2]; + buffer1.get(array1); + buffer2.get(array2); + + byte[] expected1 = new byte[length1]; + byte[] expected2 = new byte[length2]; + + for (int i = 0; i < length1; i++) { + expected1[i] = (byte) i; + } + + for (int i = 0; i < length2; i++) { + expected2[i] = (byte) (200 + i); + } + + assertThat(array1).isEqualTo(expected1); + assertThat(array2).isEqualTo(expected2); + } + + private static class MockRangeReadableStream extends SeekableInputStream + implements RangeReadable { + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void seek(long newPos) throws IOException { + // no-op + } + + @Override + public int read() throws IOException { + return -1; + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + // no-op + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return -1; + } + + @Override + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + for (FileRange range : ranges) { + try { + ByteBuffer buffer = allocate.apply(range.length()); + + for (int i = 0; i < range.length(); i++) { + buffer.put((byte) (range.offset() + i)); + } + + buffer.flip(); + range.byteBuffer().complete(buffer); + } catch (Exception e) { + range.byteBuffer().completeExceptionally(e); + } + } + } + } +}