-
Notifications
You must be signed in to change notification settings - Fork 3k
core: Adding read vector to range readable interface and adding mappe… #13997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| private static List<ParquetObjectRange> convertRanges(List<ParquetFileRange> ranges) { | ||
| return ranges.stream() | ||
| .map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this just maps between the internal parquet hadoop range and the new iceberg one.
fuatbasik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @stubz151 . Interface and configuration flag looked good to me. I just put two small comments
api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
Outdated
Show resolved
Hide resolved
6cc3884 to
6e19605
Compare
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java
Outdated
Show resolved
Hide resolved
48d647d to
fa97848
Compare
…r to parquet stream.
fa97848 to
8f05471
Compare
| * this class is written by @mukundthakur, and taken from | ||
| * /hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java (thank you!). | ||
| */ | ||
| public final class VectoredReadUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel like we need this class. There are three things this does, but it should probalby be just one. The validateRangeRequest should just be handled in the constructor of the FileRange (we currently don't have any validation there). The sortRangeList is a subset of validateAndSortRanges which seems duplicative.
I'd suggest moving validateAndSort to the RangeReadable interface as a static utility that can be used by implementors and avoid creating this util class.
| private final long offset; | ||
| private final int length; | ||
|
|
||
| public FileRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the parquet implementation, I don't think you can pass the byteBuffer future in like this. I believe this is intended to be set by the implementation so that it can be returned to the invoker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We not passing the bytebuffer in here right, we passing a future that completes with a byte buffer, we need a way to map the futures in Iceberg to the future's we are setting in Parquet,
So when we call parquetFileRange.setDataReadFuture(future); we need to have a way of tracking that future in Iceberg and that's what this gives us.
cdbffc4 to
65ce5a7
Compare
| } | ||
|
|
||
| @Override | ||
| public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allocate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests at the ParquetIO level to validate this? I know we're adding some in S3FileIO, but it would be good to have this interface tested (even if there's a mock implementation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added in testRangeReadableAdapterReadVectored which does something similar to the tests in S3FileIO, but focused a bit more on checking that the buffers/ranges are being used correctly, I skipped the other operations but can add them in if we want. Let me know
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Outdated
Show resolved
Hide resolved
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Outdated
Show resolved
Hide resolved
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Outdated
Show resolved
Hide resolved
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Outdated
Show resolved
Hide resolved
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Outdated
Show resolved
Hide resolved
| optionsBuilder.withDecryption(fileDecryptionProperties); | ||
| } | ||
|
|
||
| optionsBuilder.withUseHadoopVectoredIo(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were some efforts to allow Iceberg working without Hadoop on the classpath.
I'm not sure how far away these efforts went, and also not sure how this change will effect that effort.
Could you please help me understand the consequences of always using withUseHadoopVectoredIo?
Thanks,
Peter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For part 1 about the effort to reduce the dependencies on Hadoop I don't think that was ever completed I do see a TODO comment about wanting to do it. I am probably making the effort more complicated as I am adding 2 new imports from Hadoop but I don't think that is a big risk.
for 2) withUseHadoopVectoredIo is used in the file reader in conjunction with readVectoredAvailable() so moving to always using readVector doesn't change anything unless the stream also supports readVectored.
https://github.com/apache/parquet-java/blob/f50dd6cb4b526cf4b585993c1b69a838cd8151f3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1303
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the naming of this option is a little misleading. The withUseHadoopVectoredIo doesn't necessarily depend on hadoop as @stubz151 mentions, but rather enables the vectored io behavior in Parquet.
861cdf3 to
c6e04ea
Compare
d35250f to
278a0fb
Compare
pvary
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok from my side, but I would like to ask someone else to take a look as well.
278a0fb to
bedfe8a
Compare
aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Outdated
Show resolved
Hide resolved
bedfe8a to
0aa316b
Compare
0aa316b to
180613d
Compare
danielcweeks
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stubz151 !
apache#13997) Core: Adding read vector to range readable interface and adding mapper to parquet stream.
apache#13997) Core: Adding read vector to range readable interface and adding mapper to parquet stream.
What Am I doing
Adding a read vector implementation to the range readable interface. To do this I'm adding the methods to check if it's enabled and proving an interface which one can implement.
#13254
Changes
testing
Tested with the AAL implementation and it is passed correctly with flag
--conf "spark.sql.iceberg.read.vector.enabled=true" \
Notes
Kept the AAL changes seperate to not bloat this PR but can include them we want to see a functional implementation of this.