-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11164] [SQL] Add InSet pushdown filter back for Parquet #10278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
After reading the other push-down PR, I think it also needs a review from @liancheng . Welcome any comment! Thanks! |
|
Test build #47615 has finished for PR 10278 at commit
|
|
Test build #47616 has finished for PR 10278 at commit
|
|
Do you have a test case that actually shows a wrong answer being computed? |
|
This only happens in 1.5. Do you need me to write a test case for 1.5? |
|
Any bug fix should have a regression test. We could always change the optimizer in a way that does not hide this bug anymore. |
|
Ok, will make a try to force it. Thanks! |
|
Its fine if the test only fails on 1.5 |
|
Great! : ) Let me also post the test case I did in the latest 1.5. Without my fix, the first call of show() did not return the row (2, 0). Feel free to let me know if you want me to deliver the following test case. withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i, (i%2).toString)).toDF("a", "b").write.parquet(path)
val df = sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))")
df.show()
val df1 = sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))")
df1.show()
}
} |
|
I might find another bug in Parquet pushdown. Will submit the fix later when I can confirm it. |
|
Test build #47618 has finished for PR 10278 at commit
|
|
@gatorsmile Sorry for the late reply and thanks for the nice catch! The For the
One benefit of CNF is that it enables more filter push-down opportunities. Since we don't have existential / universal quantifier in our predicates, I think CNF conversion in Spark SQL can be as simple as keeping pushing object CNFConversion extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter: Filter =>
import org.apache.spark.sql.catalyst.dsl.expressions._
filter.copy(condition = filter.condition.transform {
case Not(x Or y) => !x && !y
case Not(x And y) => !x || !y
case (x And y) Or z => (x || z) && (y || z)
case x Or (y And z) => (x || y) && (x || z)
})
}
}(Notice that this version doesn't handle common expression elimination.) That said, the @rxin @marmbrus Not super confident about the CNF conversion conclusion above, please correct me if I'm wrong. |
Not is included in Parquet filter pushdownNot is included in Parquet filter pushdown
|
Thank you for your detailed explanation! @liancheng I have the same opinion as @marmbrus . We should include CNF conversion into our optimizer. Some RDBMS systems do it in the phase of query rewriting. Below is my 2 cents about CNF. Generally, CNF conversion is an important concept in query optimization, especially when we support indexing in Spark. When (multi-attribute) indexes exist over some subset of conjucts, we can employ these indexes to improve the selectivity. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is the real problem. It is not safe to just push one side at here. This is the place where we drop that In because createFilter(schema, In(...)) returns None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I am also going to try to have a fix. We can later see which one is a more suitable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to see your fix. : ) Thank you!
|
Test build #47939 has finished for PR 10278 at commit
|
|
@yhuai @liancheng Regarding this PR, should I keep it open? This PR also has another fix that can push down the filter |
|
You can change it to just handle |
|
After reading the contents, that PR was closed due to another issue of String filters in the same PR. Please correct me if my understanding is wrong. @liancheng Do you want to deliver this by submitting another PR, @viirya ? Either is fine for me. Thanks! |
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
|
Test build #48036 has finished for PR 10278 at commit
|
|
retest this please |
|
Test build #48045 has finished for PR 10278 at commit
|
|
@gatersmile Yeah, you're right. #8956 initially aimed to fix other issues, and also included the fix for BTW, it's almost always strictly better to open small PRs that contains ONLY a single change than bigger ones that contains multiple changes. The former are much easier to review and get merged. (One liner PRs are super welcomed!) |
|
Thank you for your suggestions! @liancheng : ) Next time, I will not mix multiple fixes in the same PR. |
Not is included in Parquet filter pushdown|
@gatorsmile Could you please update the PR description? |
|
@liancheng Done. : ) |
|
Thanks! I'm merging this to master, and will attribute this one to @viirya. |
|
Thank you! |
|
Thanks @gatorsmile @liancheng |
When the filter is
"b in ('1', '2')", the filter is not pushed down to Parquet. Thanks!