-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.2: Fix predicate pushdown in row-level operations #4023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.2: Fix predicate pushdown in row-level operations #4023
Conversation
76317e5 to
383d2d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm, one small comment.
And just for my understanding (as not super familiar), this issue seems to affects only 'conjunctive predicates' , ie with AND. Was reading method 'splitCojunctivePredicate' and we never push down OR's in any case?
|
|
||
| val (scan, output) = PushDownUtils.pruneColumns( | ||
| scanBuilder, relation, relation.output, Seq.empty) | ||
| val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, is it necessary change , Seq.empty => Nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it so that it can fit on one line just like the new filter pushdown logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aokolnychyi @szehon-ho any reason for not passing pushedFilters here instead of Nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually nvm, this is only to prune columns.
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks @aokolnychyi !!!
| tableAttrs: Seq[AttributeReference]): (Seq[Filter], Seq[Expression]) = { | ||
|
|
||
| val tableAttrSet = AttributeSet(tableAttrs) | ||
| val filters = splitConjunctivePredicates(cond).filter(_.references.subsetOf(tableAttrSet)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this what was preventing pushdown before? We weren't filtering out expressions that referenced columns outside of the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we did not split the condition before into parts and did not remove filters that referenced both tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho ˆˆ
This comment provides a little bit more info to answer your question above.
We treated t.id = s.id and t.dep IN ('hr') as a single predicate that couldn't be converted as it referenced both tables. Instead, we now split it into parts and convert whatever we can (i.e. t.dep IN ('hr') in this case).
|
|
||
| Snapshot mergeSnapshot = table.currentSnapshot(); | ||
| String deletedDataFilesCount = mergeSnapshot.summary().get(SnapshotSummary.DELETED_FILES_PROP); | ||
| Assert.assertEquals("Must overwrite only 1 file", "1", deletedDataFilesCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other tests use the listener to check the expressions that were pushed down directly. Should we do that in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I missed that. Could you point me to an example?
|
Thanks for reviewing, @singhpk234 @szehon-ho @rdblue! |
(cherry picked from commit 5d599e1)
This PR fixes predicate pushdown in row-level operations in Spark 3.2. Previously, we would not extract filters and MERGE conditions such as
t.id = s.id and t.dep IN ('hr')would not be pushed down.