-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3567] Refactor HoodieCommonUtils to make code more reasonable #4982
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@alexeykudinkin Could you help me to review this pr? Thanks. |
|
@hudi-bot run azure |
2 similar comments
|
@hudi-bot run azure |
|
@hudi-bot run azure |
| FileStatusCache.getOrCreate(sparkSession)) | ||
|
|
||
| // Resolve partition predicates, only conjunctive predicates are supported | ||
| val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession, predicate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create HoodieCatalystExpressionUtils and move resolveFilterExpr in there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| // Get all partitions and prune partition by predicates | ||
| val partitionPaths = hoodieFileIndex.getAllCachedPartitionPaths.asScala.toSeq | ||
| val prunedPartitions = hoodieFileIndex.prunePartition(partitionPaths, predicates._2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of invoking pruning here directly, let's encapsulate pruning w/in the Index itself and then expose following API
class SparkHoodieTableFileIndex {
def getPartitionPaths(List<Expression> predicates): Seq[PartitionPath] = {
// prune internally
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // Resolve partition predicates, only conjunctive predicates are supported | ||
| val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession, predicate, | ||
| hoodieFileIndex.partitionSchema) | ||
| val predicates = splitPredicates(partitionPredicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that we're splitting but we don't separate b/w Data filters and Partition filters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And there's actually util doing what you need splitPartitionAndDataPredicates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already use HoodieDataSourceHelper#splitPartitionAndDataPredicates method to split Data filters and Partition filters, and adding more tests.
| * @return The pair of disjunctive predicates and conjunctive predicates | ||
| */ | ||
| private def splitPredicates(condition: Expression): (Seq[Expression], Seq[Expression]) = { | ||
| condition match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check my comment above regarding splitting
| protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = { | ||
| HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, configProperties, | ||
| basePath, partitionSchema, partitionColumns, partitionPath) | ||
| if (partitionColumns.length == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You just moving this code back, right? There's no additional changes, are there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just moving this code back with no additional changes.
| * @param predicates The filter condition. | ||
| * @return The Pruned partition paths. | ||
| */ | ||
| def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check my comment above, and let's make this method private
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hudi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move it to org.apache.spark.sql (so that we can access some package-private APIs)
|
@huberylee thanks for cleaning things up! |
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
@alexeykudinkin All comments have been addressed. |
What is the purpose of the pull request
Refactor
HoodieCommonUtilsto make code more reasonableBrief change log
SparkHoodieTableFileIndex,BaseHoodieTableFileIndexandHoodieFileIndexresolveFilterExprmethod toDataSkippingUtilsHoodieCommonUtilstoHoodieCLIUtilsHoodieFileIndexto adapt partition pruning inRunClusteringProcedureVerify this pull request
This pull request is already covered by existing tests, such as
TestHoodieFileIndex,TestCallProcedure.Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.