Skip to content

Skip reading Parquet pages using Column Indexes feature of Parquet 1.11#17216

Closed
shangxinli wants to merge 6 commits intoprestodb:masterfrom
shangxinli:column_indexes_dev_new
Closed

Skip reading Parquet pages using Column Indexes feature of Parquet 1.11#17216
shangxinli wants to merge 6 commits intoprestodb:masterfrom
shangxinli:column_indexes_dev_new

Conversation

@shangxinli
Copy link
Copy Markdown
Collaborator

Test plan - (Please fill in how you tested your changes)

Please make sure your submission complies with our Development, Formatting, and Commit Message guidelines. Don't forget to follow our attribution guidelines for any code copied from other projects.

Fill in the release notes towards the bottom of the PR description.
See Release Notes Guidelines for details.

== RELEASE NOTES ==

General Changes
* ...
* ...

Hive Changes
* ...
* ...

If release note is NOT required, use:

== NO RELEASE NOTE ==

@shangxinli shangxinli force-pushed the column_indexes_dev_new branch 2 times, most recently from 1cb54cd to c8b834f Compare January 23, 2022 05:44
@shangxinli shangxinli force-pushed the column_indexes_dev_new branch from c8b834f to d12e7ff Compare January 23, 2022 18:16
Copy link
Copy Markdown
Collaborator

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work, @shangxinli
left some comments
recently did a parquet code refactor, to speedup ParquetTupleDomainPredicate building, could you please rebase on the recent master?

List<ColumnIndexStore> blockIndexStores = new ArrayList<>();
for (BlockMetaData block : footerBlocks.build()) {
if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, failOnCorruptedParquetStatistics)) {
ColumnIndexStore ciStore = getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, readColumnIndexFilter);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ciStore/columnIndexStore/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just replaced them

return getParquetType(prestoType, messageType, column, tableName, path);
}

private static ColumnIndexStore getColumnIndexStore(Predicate parquetPredicate, ParquetDataSource dataSource, BlockMetaData blockMetadata, Map<List<String>, RichColumnDescriptor> descriptorsByPath, boolean readColumnIndexFilter)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Optional, instead of null?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked earlier with @beinan about whether we should return optional or null for this methood. For this use case even we use optional, we still need to check .isPresent(). It is pretty much similar with null checking and not much value to do so. Here are some good explanations about null checking and Optional checking https://medium.com/javarevisited/null-check-vs-optional-are-they-same-c361d15fade3.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. I think we try to use Optional as much as possible, trying to get rid of nullPointer exceptions. What do you think @beinan ?

Copy link
Copy Markdown
Collaborator Author

@shangxinli shangxinli Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to replace it with Optional. But when I try it and see we still need to end up with null checking. This is because ColumnIndexStore.java that defined in Parquet repo is using null checking. So the class ParquetColumnIndexStore extends ColumnIndexStore need to have the same signature. So if we replace it with Optional, it ends up that we convert from null checking to Optional.empty() check in HDFSParquetDataSource and have to convert to Optiona.empty() to null checking in ParquetColumnIndexStore. Since we only use empty() API in Optional, as that article mentioned, null checking and empty() is not much different in this case. Thoughts?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. yep, bad Presto and Parquet are having different coding styles.
how about we try Optional as much as possible in Presto code? This reminds me old days when try to implement new Parquet reader for Presto :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same feeling, I also suggest use optional in the old PRs.

TupleDomain<DeltaColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
boolean readColumnIndexFilter)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/readColumnIndexFilter/columnIndexFilterEnabled/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}

boolean hasColumnIndex = false;
for (ColumnChunkMetaData column : blockMetadata.getColumns()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we rewrite this with stream api? use anyMatch?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!

return this.materializedViewMissingPartitionsThreshold;
}

@Config("hive.parquet-use-column-index-filter")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:
hive.parquet-column-index-filter-enabled?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

valueCount += readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, pages);
firstRowIndex = PageReader.getFirstRowIndex(dataPageCount, offsetIndex);
valueCount += readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, firstRowIndex, pages);
++dataPageCount;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/++dataPageCount/dataPageCount = dataPageCount + 1/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

return dataHeaderV2.getNum_values();
}

private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/valuesCountReadSoFar/valuesCount/g
s/dataPageCountReadSoFar/pagesCount/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

currentBlock = currentBlock + 1;

if (filter != null && readColumnIndexFilter) {
ColumnIndexStore ciStore = blockIndexStores.get(currentBlock);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ciStore/columnIndexStore/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

OffsetIndex offsetIndex = blockIndexStores.get(currentBlock).getOffsetIndex(metadata.getPath());
OffsetIndex filteredOffsetIndex = ColumnIndexFilterUtils.filterOffsetIndex(offsetIndex, currentGroupRowRanges, blocks.get(currentBlock).getRowCount());
List<OffsetRange> offsetRanges = ColumnIndexFilterUtils.calculateOffsetRanges(filteredOffsetIndex, metadata, offsetIndex.getOffset(0), startingPosition);
List<ConsecutivePartList> allParts = concatRanges(offsetRanges);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/allParts/offsetRanges/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the variable offsetRanges. We concat it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. how about:
consecutiveRanges?
generally, we do not use variable abbreviation in Presto

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

* Describes a list of consecutive parts to be read at once. A consecutive part may contain whole column chunks or
* only parts of them (some pages).
*/
private class ConsecutivePartList
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ConsecutivePartList/PageRanges/g

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is from Parquet. Better to keep the same name? What do you think?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. We could either add in comments above, describing the private class is from Parquet, or rename it to PageRanges

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just replace it with PageRanges

@shangxinli
Copy link
Copy Markdown
Collaborator Author

nice work, @shangxinli left some comments recently did a parquet code refactor, to speedup ParquetTupleDomainPredicate building, could you please rebase on the recent master?

Yes, did it. Thanks for letting me know.

@shangxinli shangxinli force-pushed the column_indexes_dev_new branch from 25f3e5b to 6760bb1 Compare January 31, 2022 05:38
@shangxinli shangxinli force-pushed the column_indexes_dev_new branch from 563a586 to 6bccc1e Compare January 31, 2022 14:41
@shangxinli
Copy link
Copy Markdown
Collaborator Author

Except for the known build issue, @beinan @zhenxiao Do you still have other comments?

This is a pretty big change and it often has conflicts when new commits are in. The conflicts already happened 3 times and it is a painful process to manually resolve those conflicts.

Copy link
Copy Markdown
Collaborator

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @shangxinli
mostly looks nice. some minor things

* values (and the related rl and dl) for the rows [20, 39] in the end of the page 0 for col2. Similarly, we have to
* skip values while reading page0 and page1 for col3.
*/
private void processValuesSync(int valuesToRead, Consumer<Void> valueConsumer)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am still inclined to merge the two functions. The signatures are the same. New function looks like:
private void processValues(int valuesToRead, Consumer<Void> valueConsumer, boolean indexEnabled)

}
}

// Used for columns are not in this parquet file
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could remove this comment. Or,
for columns not in this parquet file

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the comments

OffsetIndex offsetIndex = blockIndexStores.get(currentBlock).getOffsetIndex(metadata.getPath());
OffsetIndex filteredOffsetIndex = ColumnIndexFilterUtils.filterOffsetIndex(offsetIndex, currentGroupRowRanges, blocks.get(currentBlock).getRowCount());
List<OffsetRange> offsetRanges = ColumnIndexFilterUtils.calculateOffsetRanges(filteredOffsetIndex, metadata, offsetIndex.getOffset(0), startingPosition);
List<ConsecutivePartList> allParts = concatRanges(offsetRanges);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. how about:
consecutiveRanges?
generally, we do not use variable abbreviation in Presto

* Describes a list of consecutive parts to be read at once. A consecutive part may contain whole column chunks or
* only parts of them (some pages).
*/
private class ConsecutivePartList
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. We could either add in comments above, describing the private class is from Parquet, or rename it to PageRanges

@shangxinli shangxinli force-pushed the column_indexes_dev_new branch from 94c2ffa to e94aada Compare February 10, 2022 01:21
Copy link
Copy Markdown
Collaborator

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks nice, @shangxinli
one minor issue
could you please squash all commits into one, and add release note?
also, make sure all tests passed

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add message in the exception, like:
parquet type not supported: %s

Copy link
Copy Markdown
Member

@beinan beinan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, I think reviewed most of the code change in the previous PRs. Thank you @shangxinli ! Looking forward to seeing more contribution from parquet community! Thanks!

@beinan
Copy link
Copy Markdown
Member

beinan commented Feb 10, 2022

rerun the tests

@shangxinli
Copy link
Copy Markdown
Collaborator Author

Thanks @beinan @zhenxiao for your time to review!

@rongrong
Copy link
Copy Markdown
Contributor

We don't do merge commits, please rebase your changes onto master and squash all changes into one commit. Thanks!

@shangxinli shangxinli force-pushed the column_indexes_dev_new branch from ece19ac to 47bfd42 Compare February 10, 2022 21:41
@shangxinli
Copy link
Copy Markdown
Collaborator Author

@rongrong That means I need to recreate a new PR?

@shangxinli
Copy link
Copy Markdown
Collaborator Author

Created a new PR #17284 to rebase due to so manage conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants