-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Parquet: page skipping using filtered row groups (vectorized and non-vectorized read) #10399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- When converting an Iceberg filter to a Parquet filter, and then using the converted filter in Parquet to filter row groups, Parquet checks that the type of a column in a filter predicate matches the type in the Parquet file as a validation step before applying the filter. This validation fails for some cases, e.g., INT96 timestamp columns. When doing the conversion, we thus need to check that such a type mismatch does not occur and fail the conversion if it does. - When converting the Iceberg filter to a Parquet filter fails, we need to handle the failure, and in ReadConf, we need to use the internally computed total number of rows instead of the values returned by the ParquetFileReader's getFilteredRecordCount(). - In ParquetReader.FileIterator, since we have to handle both cases where a Parquet record filter is used and where it is not, we avoid the skipNextRowGroup() and readNextFilteredRowGroup() methods of ParquetFileReader and instead proceed row group by row group and call readFilteredRowGroup(int) with the index of the row group.
Now that we track the offset in the Arrow vector using the ReadState, we don't need to pass the number of values in the vector, which is not used as it is no longer the actual offset as it does not account for skipped rows.
|
@rdblue @aokolnychyi @flyrain can you please review? |
|
For reference, the files that changed between #10228 and this PR are: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java In addition, I posted the diff at https://gist.github.com/wypoon/855ac077c518e729451b9a4cb06fa818. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This resolves #193.
This is the 3rd cumulative PR that builds on top of #10228 and #10107.
#10228 implements page skipping for the non-vectorized read path and this PR picks up from there and implements it for the vectorized read path.
The implementation of the vectorized case is based on the implementation in Spark's Parquet reader (see apache/spark#32753), which is in Spark 3.2.
As already implemented in #10228, we convert an Iceberg filter to a Parquet filter and use the Parquet filter to filter row groups, calling
ParquetFileReader#readFilteredRowGroup. We get row indexes from the returnedPageReadStoreby callingPageReadStore#getRowIndexes. Following the Spark implementation, we construct row ranges (using a simpleRowRange(not the internal ParquetRowRange) class capturing start and end indexes) from the row indexes (aPrimitiveIterator.OfLong).In order to synchronize the reading of the columns (since different columns can have different number of pages and thus pages in different columns start at different row indexes), we get the index of the first row in a page by calling
DataPage#getFirstRowIndexand we need to track the current row index in the column and the current offset in the Arrow vector we are reading the column values into. Previously, the read state only needed to account for the number of rows read so far and the number of rows left to read, but now "reading" includes both actual reading into the Arrow vector and skipping rows. We therefore use aReadStateobject (following Spark'sParquetReadState) to track the offset and row index as well as the current row range to read, and to advance them.There is a lot of structurally similar code in several places in
VectorizedParquetDefinitionLevelReader. I have refactored the similar code into one place inVectorizedParquetDefinitionLevelReader. Then I can modify the logic in this one place to perform the synchronization (skipping rows to the start of the row range). The rest of the changes implement how to skip rows for different column types.