-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37220][SQL] Do not split input file for Parquet reader with aggregate push down #34498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@huaxingao, @sunchao and @viirya - could you help take a look when you have time? Thanks. |
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @c21, looks good just one nit.
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Outdated
Show resolved
Hide resolved
| override def isSplitable(path: Path): Boolean = { | ||
| // If aggregate is pushed down, only the file footer will be read once, | ||
| // so file should not be split across multiple tasks. | ||
| pushedAggregate.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test status failure |
|
Test build #144942 has finished for PR 34498 at commit
|
|
Test build #144944 has finished for PR 34498 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay.
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #144945 has finished for PR 34498 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144948 has finished for PR 34498 at commit
|
|
Thank you @sunchao, @huaxingao and @viirya for review! |
|
Quick question on:
How did the existing tests pass before this PR? |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM2
| override def isSplitable(path: Path): Boolean = { | ||
| // If aggregate is pushed down, only the file footer will be read once, | ||
| // so file should not be split across multiple tasks. | ||
| pushedAggregate.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, okay. Got it now.
| // footers for every split of the file. Basically if the start (the beginning of) | ||
| // the offset in PartitionedFile is 0, we will read the footer. Otherwise, it means | ||
| // that we have already read footer for that file, so we will skip reading again. | ||
| if (file.start != 0) return null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick question on: Existing unit test in FileSourceAggregatePushDownSuite.scala. How did the existing tests pass before this PR?
@HyukjinKwon - I think we are in the same page based on your latest comment, but just to be noisy here in case anything is missing. Before this PR, when a single file is split into multiple splits across multiple tasks, we have the logic here to only process the split of file if file.start == 0, so only the first split of file will be processed, and every file is processed only once. So here is the trick. Before this PR, the logic for Parquet aggregate push down was still correct. We want to avoid unnecessary file splitting so update logic in this PR here.
What changes were proposed in this pull request?
As a followup of https://github.com/apache/spark/pull/34298/files#r734795801, Similar to ORC aggregate push down, we can disallow split input files for Parquet reader as well. See original comment for more details of motivation. Also fix the string of
RowDataSourceScanExecto only print outPushedAggregatesandPushedGroupby, to be aligned withPushedLimitandPushedSample, as there's not so many queries can benefit from aggregate push down, so we don't need to print those unnecessary information.Why are the changes needed?
Avoid unnecessary file splits in multiple tasks for Parquet reader with aggregate push down.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit test in
FileSourceAggregatePushDownSuite.scala.