-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3594] Supporting Composite Expressions over Data Table Columns in Data Skipping flow #4996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
4e5830d to
e4a45b8
Compare
4446457 to
922bfbe
Compare
| * @param tableSchema table schema encompassing attributes to resolve against | ||
| * @return Resolved filter expression | ||
| */ | ||
| def resolveExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not change
| * @param partitionColumns The partition columns | ||
| * @return (partitionFilters, dataFilters) | ||
| */ | ||
| def splitPartitionAndDataPredicates(sparkSession: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not change
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach seems good to me. Just started reviewing through.
@xiarixiaoyao Would you have some cycles to review as well, since you wrote a lot of the original skipping utils code
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this code adapted from somewhere? if so, can you please add source attribution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, this is our code. Had to place it in spark.sql to access package-private API
|
|
||
| private object OrderPreservingTransformation { | ||
| def unapply(expr: Expression): Option[AttributeReference] = { | ||
| expr match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess these are are the transformations that we whitelist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
| // | ||
| // This expression will be translated into following Filter Expression for the Column Stats Index: | ||
| // | ||
| // ```(transform_expr(colA_minValue) <= value_expr) AND (value_expr <= transform_expr(colA_maxValue))``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: Let's take an example that parses a timestamp ts column into date using something like date_format.
date_format(ts, ...) = '2022-03-01'
We will simply look for files that have overlap with that date. sgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a test testing exactly this use-case
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you can share some test results, we can land this. otherwise lgtm
|
@vinothchandar what test results are you referring to? |
eac505a to
fccd455
Compare
| // NOTE: That we can apply ```transform_expr``` transformation precisely b/c it preserves the ordering of the | ||
| // values of the source column, ie following holds true: | ||
| // | ||
| // colA_minValue = min(colA) => transform_expr(colA_minValue) = min(transform_expr(colA)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appreciate the detailed explanation.
| ), | ||
| Seq("file_1")), | ||
| arguments( | ||
| "date_format(C, 'MM/dd/yyyy') IN ('03/08/2022')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have support for AND with our data skipping?
date_format(C, 'MM/dd/yyyy') >= '03/08/2022' and date_format(C, 'MM/dd/yyyy') <= '03/10/2022'
I understand the test may not go into this test class. just asking in general, do we have tests elsewhere to cover this case.
Also, can we add a test for matching multiple entries.
date_format(C, 'MM/dd/yyyy') IN ('03/08/2022', '03/09/2022')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the data skipping class. looks like IN w/ multiple values is supported.
|
@hudi-bot run azure |
fccd455 to
ffb38fb
Compare
|
@nsivabalan we can land this once CI passes |
…xpressions (in lieu of just "literals"); Tidying up
… any other attributes or holding sub-query exprs
…ort arbitrary expressions involving no more than single column; Extracted common Column Stsat expression utils;
…ie expression containing exactly one attribute reference, and no sub-queries)
Added test for fallback
Added casting into list of allowed transformation exprs
Scaffolded `SparkAdapter` wiring
… 3.2.x; Repurposed `HoodieSpark3CatalystExpressionUtils` to support Spark 3.1.x (and maybe 3.0.x)
Scaffolded `Spark3_1Adapter`
…lumnRangeMetadata`
ffb38fb to
5bdb3ee
Compare
…in Data Skipping flow (apache#4996)
…in Data Skipping flow (apache#4996)
Tips
What is the purpose of the pull request
Supporting Composite Expressions (including standard Spark functions and UDFs) as Filter Expressions in Data Skipping flow.
For example, if previously we were only supporting rather simple expressions over Data Table attributes like
WHERE columnA = 42, now we will be supporting much broader scope of expressions (strictly defined below) allowing for example Data Skipping to properly digest queries likeWHERE date_format(columnA, 'MM/dd/yyyy') = '01/01/2022'referencing Spark's standard functiondate_format.Formally, now supported are any expressions such that it
A * B = 0are not supported)Also, now as "value expression" we support any expression such that it
A = Bfilters are not supported)Brief change log
resolveExprutil to properly resolve Spark functionsVerify this pull request
This pull request is already covered by existing tests, such as (please describe tests).
This change added tests
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.