Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/FileRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class FileRange {
private final CompletableFuture<ByteBuffer> byteBuffer;
private final long offset;
private final int length;

public FileRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length)
throws EOFException {
Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null");
Preconditions.checkArgument(
length() >= 0, "Invalid length: %s in range (must be >= 0)", length);
Preconditions.checkArgument(
offset() >= 0, "Invalid offset: %s in range (must be >= 0)", offset);

this.byteBuffer = byteBuffer;
this.offset = offset;
this.length = length;
}

public CompletableFuture<ByteBuffer> byteBuffer() {
return byteBuffer;
}

public long offset() {
return offset;
}

public int length() {
return length;
}
}
63 changes: 63 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/RangeReadable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* {@code RangeReadable} is an interface that allows for implementations of {@link InputFile}
Expand Down Expand Up @@ -77,4 +83,61 @@ default void readFully(long position, byte[] buffer) throws IOException {
default int readTail(byte[] buffer) throws IOException {
return readTail(buffer, 0, buffer.length);
}

/**
* Read fully a list of file ranges asynchronously from this file. As a result of the call, each
* range will have FileRange.setData(CompletableFuture) called with a future that when complete
* will have a ByteBuffer with the data from the file's range.
*
* <p>The position returned by getPos() after readVectored() is undefined.
*
* <p>If a file is changed while the readVectored() operation is in progress, the output is
* undefined. Some ranges may have old data, some may have new and some may have both.
*
* <p>While a readVectored() operation is in progress, normal read api calls may block.
*
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @throws IOException any IOE.
* @throws IllegalArgumentException if any of ranges are invalid, or they overlap.
*/
default void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
List<FileRange> validatedRanges = sortRanges(ranges);
for (FileRange range : validatedRanges) {
ByteBuffer buffer = allocate.apply(range.length());
readFully(range.offset(), buffer.array());
range.byteBuffer().complete(buffer);
}
}

static List<FileRange> sortRanges(final List<FileRange> input) {
Preconditions.checkNotNull(input, "Input list can't be null");

final List<FileRange> sortedRanges;

// 2 because the input size can be 0/1, and then we want to skip sorting.
if (input.size() < 2) {
sortedRanges = input;
} else {
sortedRanges =
input.stream()
.sorted(Comparator.comparingLong(FileRange::offset))
.collect(Collectors.toList());
FileRange prev = null;
for (final FileRange current : sortedRanges) {
if (prev != null) {
Preconditions.checkArgument(
current.offset() >= prev.offset() + prev.length(),
"Overlapping ranges %s and %s",
prev,
current);
}

prev = current;
}
}

return sortedRanges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,7 @@ public <D> CloseableIterable<D> build() {
optionsBuilder.withDecryption(fileDecryptionProperties);
}

optionsBuilder.withUseHadoopVectoredIo(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were some efforts to allow Iceberg working without Hadoop on the classpath.
I'm not sure how far away these efforts went, and also not sure how this change will effect that effort.

Could you please help me understand the consequences of always using withUseHadoopVectoredIo?

Thanks,
Peter

Copy link
Contributor Author

@stubz151 stubz151 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For part 1 about the effort to reduce the dependencies on Hadoop I don't think that was ever completed I do see a TODO comment about wanting to do it. I am probably making the effort more complicated as I am adding 2 new imports from Hadoop but I don't think that is a big risk.

for 2) withUseHadoopVectoredIo is used in the file reader in conjunction with readVectoredAvailable() so moving to always using readVector doesn't change anything unless the stream also supports readVectored.
https://github.com/apache/parquet-java/blob/f50dd6cb4b526cf4b585993c1b69a838cd8151f3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1303

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming of this option is a little misleading. The withUseHadoopVectoredIo doesn't necessarily depend on hadoop as @stubz151 mentions, but rather enables the vectored io behavior in Parquet.

ParquetReadOptions options = optionsBuilder.build();

NameMapping mapping;
Expand Down
83 changes: 83 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
*/
package org.apache.iceberg.parquet;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -29,11 +35,16 @@
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.DelegatingInputStream;
import org.apache.iceberg.io.DelegatingOutputStream;
import org.apache.iceberg.io.FileRange;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.ParquetFileRange;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;

Expand Down Expand Up @@ -91,6 +102,11 @@ static SeekableInputStream stream(org.apache.iceberg.io.SeekableInputStream stre
return HadoopStreams.wrap((FSDataInputStream) wrapped);
}
}

if (stream instanceof RangeReadable) {
return new ParquetRangeReadableInputStreamAdapter(stream);
}

return new ParquetInputStreamAdapter(stream);
}

Expand Down Expand Up @@ -123,6 +139,73 @@ public void seek(long newPos) throws IOException {
}
}

@VisibleForTesting
static class ParquetRangeReadableInputStreamAdapter<
T extends org.apache.iceberg.io.SeekableInputStream & RangeReadable>
extends DelegatingSeekableInputStream implements RangeReadable {
private final T delegate;

ParquetRangeReadableInputStreamAdapter(T delegate) {
super(delegate);
this.delegate = delegate;
}

@Override
public long getPos() throws IOException {
return delegate.getPos();
}

@Override
public void seek(long newPos) throws IOException {
delegate.seek(newPos);
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
delegate.readFully(position, buffer, offset, length);
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
return delegate.readTail(buffer, offset, length);
}

@Override
public boolean readVectoredAvailable(ByteBufferAllocator allocate) {
return true;
}

@Override
public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allocate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests at the ParquetIO level to validate this? I know we're adding some in S3FileIO, but it would be good to have this interface tested (even if there's a mock implementation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added in testRangeReadableAdapterReadVectored which does something similar to the tests in S3FileIO, but focused a bit more on checking that the buffers/ranges are being used correctly, I skipped the other operations but can add them in if we want. Let me know

throws IOException {
IntFunction<ByteBuffer> delegateAllocate = (allocate::allocate);
List<FileRange> delegateRange = convertRanges(ranges);
delegate.readVectored(delegateRange, delegateAllocate);
}

private static List<FileRange> convertRanges(List<ParquetFileRange> ranges) {
return ranges.stream()
.map(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just maps between the internal parquet hadoop range and the new iceberg one.

parquetFileRange -> {
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
parquetFileRange.setDataReadFuture(future);
try {
return new FileRange(
parquetFileRange.getDataReadFuture(),
parquetFileRange.getOffset(),
parquetFileRange.getLength());
} catch (EOFException e) {
throw new RuntimeIOException(
e,
"Failed to create range file for offset: %s and length: %s",
parquetFileRange.getOffset(),
parquetFileRange.getLength());
}
})
.collect(Collectors.toList());
}
}

private static class ParquetOutputStreamAdapter extends DelegatingPositionOutputStream {
private final org.apache.iceberg.io.PositionOutputStream delegate;

Expand Down
Loading