-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44493][SQL] Translate catalyst expression into partial datasource filter #43769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @wangyum .
This is not a correctness patch but the PR description might look misleading. Could you avoid a word correct in your PR description?
Also, cc @huaxingao too
|
Also, cc @cloud-fan from #19776, too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before:
== Physical Plan ==
*(1) Project [NAME#7, THEID#8]
+- *(1) Filter (((THEID#8 > 0) AND (trim(NAME#7, None) = mary)) OR (THEID#8 > 10))
+- BatchScan json file:/private/var/folders/j0/b7tktmwj1pl591yd6ysvv5sm0000gq/T/spark-ea31f369-4c0e-442b-941d-bbde3be2f41b[NAME#7, THEID#8] JsonScan DataFilters: [(((THEID#8 > 0) AND (trim(NAME#7, None) = mary)) OR (THEID#8 > 10))], Format: json, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/j0/b7tktmwj1pl591yd6ysvv5sm0000gq/T/spark-ea..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<NAME:string,THEID:int> RuntimeFilters: []
After:
== Physical Plan ==
*(1) Project [NAME#7, THEID#8]
+- *(1) Filter (((THEID#8 > 0) AND (trim(NAME#7, None) = mary)) OR (THEID#8 > 10))
+- BatchScan json file:/private/var/folders/j0/b7tktmwj1pl591yd6ysvv5sm0000gq/T/spark-aaa18d17-865a-45d0-9bc1-e8e90855add5[NAME#7, THEID#8] JsonScan DataFilters: [(((THEID#8 > 0) AND (trim(NAME#7, None) = mary)) OR (THEID#8 > 10))], Format: json, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/j0/b7tktmwj1pl591yd6ysvv5sm0000gq/T/spark-aa..., PartitionFilters: [], PushedFilters: [Or(GreaterThan(THEID,0),GreaterThan(THEID,10))], ReadSchema: struct<NAME:string,THEID:int> RuntimeFilters: []
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PushedFilters is not empty:
PushedFilters: [Or(GreaterThan(THEID,0),GreaterThan(THEID,10))]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we can partially push down for each data sources.(contains JDBC database).
Please tell me the exceptions, if not possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are two things.
8d91a3c to
c3b41e2
Compare
| /** | ||
| * Returns a set with all the filters present in the physical plan. | ||
| */ | ||
| def getPhysicalFilters(df: DataFrame): ExpressionSet = { | ||
| ExpressionSet( | ||
| df.queryExecution.executedPlan.collect { | ||
| case execution.FilterExec(f, _) => splitConjunctivePredicates(f) | ||
| }.flatten) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a resolved expression for `str` in the context of `df`. | ||
| */ | ||
| def resolve(df: DataFrame, str: String): Expression = { | ||
| df.select(expr(str)).queryExecution.analyzed.expressions.head.children.head | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from FileSourceStrategySuite.scala
beliefer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partially push down due to the parquet or orc partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the different?
|
cc @dongjoon-hyun Already fixed PR description. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
SPARK-22548 fixed incorrect nested AND expression pushed down to JDBC data source. But Parquet/ORC data sources do not need that fix because they always keep the
Filternode.For example
This PR adds a function parameter(named
canPartialPushDown) toDataSourceStrategy.translateFilter. The caller side can set it to true if caller side always keep theFilternode to push down more filters.Why are the changes needed?
Pushdown more filters to improve query performance. This is a very common case and can significantly improve query performance. For example:
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test:
Was this patch authored or co-authored using generative AI tooling?
No.