-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Parquet: page skipping using filtered row groups for non-vectorized read #10228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- When converting an Iceberg filter to a Parquet filter, and then using the converted filter in Parquet to filter row groups, Parquet checks that the type of a column in a filter predicate matches the type in the Parquet file as a validation step before applying the filter. This validation fails for some cases, e.g., INT96 timestamp columns. When doing the conversion, we thus need to check that such a type mismatch does not occur and fail the conversion if it does. - When converting the Iceberg filter to a Parquet filter fails, we need to handle the failure, and in ReadConf, we need to use the internally computed total number of rows instead of the values returned by the ParquetFileReader's getFilteredRecordCount(). - In ParquetReader.FileIterator, since we have to handle both cases where a Parquet record filter is used and where it is not, we avoid the skipNextRowGroup() and readNextFilteredRowGroup() methods of ParquetFileReader and instead proceed row group by row group and call readFilteredRowGroup(int) with the index of the row group.
147ac9e to
fa0bd05
Compare
|
@zhongyujiang I would be happy to make you a co-author, but it was not easy to pull in commits from your PR directly. If you like, you can open a PR against my branch (even a dummy commit) and I can merge it and have you show up as a co-author. |
|
@sunchao @chenjunjiedada you may be interested in this. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This builds on #10107.
It borrows and adapts code and the test
TestSparkParquetPageSkippingfrom @zhongyujiang's #6967.The difference in approach here is that we do not make use of any Parquet internal API. We simply convert the Iceberg filter to a Parquet filter and use
ParquetFileReader#readFilteredRowGroup(int)andPageReadStore#getRowIndexes().We borrow and adapt the code from #6967 for synchronizing the column readers (as each column might have different number of pages and so the columns might be at different row indexes when a filtered row group is read).
There are some limitations:
In this PR, we only implement the page skipping for the non-vectorized read path. We plan to work on the vectorized read path separately. In
TestSparkParquetPageSkipping, we test both vectorized and non-vectorized reads and there one can see the difference in the rows that are read (as page skipping is not implemented for the vectorized path).Due to the fact that before it performs the filtering, Parquet validates that the column type in predicates in the filter match the type of the column in the Parquet file, we have to skip using Parquet filtering in some cases, e.g., when a column is an INT96 timestamp.
Currently,
ParquetFilters.ConvertFilterToParquethandles only a small set of operators, so e.g., a filter with IN does not get converted. This can be improved independently.