-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41183][SQL] Add an extension API to do plan normalization for caching #38692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * <ul> | ||
| * <li>Analyzer Rules.</li> | ||
| * <li>Check Analysis Rules.</li> | ||
| * <li>Plan Normalization Rules.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache Plan Normalization Rules? Since it is used for cache only.
| case other => other | ||
| } | ||
|
|
||
| lazy val normalized: LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps adding some comments for this.
| df.select("i").filter($"i" > 1).cache() | ||
| assert(df.filter($"i" > 1).select("i").queryExecution.executedPlan.find { | ||
| case _: org.apache.spark.sql.execution.columnar.InMemoryTableScanExec => true | ||
| case _ => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So without the added rule, caching is unable to apply here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a negative test that verifies this? Might be overkill...
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a good idea to me.
| planChangeLogger.logRule(rule.ruleName, p, result) | ||
| result | ||
| }) | ||
| if (normalizationRules.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think entering this else block means normalizationRules is non empty, no?
| private[this] val planNormalizationRules = mutable.Buffer.empty[RuleBuilder] | ||
|
|
||
| def buildPlanNormalizationRules(session: SparkSession): Seq[Rule[LogicalPlan]] = { | ||
| planNormalizationRules.map(_.apply(session)).toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isn't .apply(...) redundant with just (...) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the existing code style in this file. I assume the reason is people who are not familiar with Scala may be confused when reading the code .map(_(session))
| commandExecuted | ||
| } else { | ||
| val planChangeLogger = new PlanChangeLogger[LogicalPlan]() | ||
| val normalized = normalizationRules.foldLeft(commandExecuted)((p, rule) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (... => { ... }) is equivalent to just { ... => ... } in this context
|
thanks for the review, merging to master! |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, late LGTM. Thank you, @cloud-fan and all.
…tionRules to injectPlanNormalizationRule ### What changes were proposed in this pull request? Followup of #38692. To follow other APIs in `SparkSessionExtensions`, the name should be `inject...Rule` and `build...Rules`. ### Why are the changes needed? typo fix ### Does this PR introduce _any_ user-facing change? not a released API ### How was this patch tested? n/a Closes #38767 from cloud-fan/small. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…caching ### What changes were proposed in this pull request? Today, Spark is very conservative and uses the analyzed plan instead of the optimized plan as the cache key. Many cache opportunities are missed. This PR updates `SparkSessionExtensions` to allow people to inject custom plan normalization rules. Users can pick some safe optimizer rules, or implement new rules based on their business needs, to do plan normalization and increase the cache hit rate. ### Why are the changes needed? allow advanced users to do caching better. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes apache#38692 from cloud-fan/cache. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…caching ### What changes were proposed in this pull request? Today, Spark is very conservative and uses the analyzed plan instead of the optimized plan as the cache key. Many cache opportunities are missed. This PR updates `SparkSessionExtensions` to allow people to inject custom plan normalization rules. Users can pick some safe optimizer rules, or implement new rules based on their business needs, to do plan normalization and increase the cache hit rate. ### Why are the changes needed? allow advanced users to do caching better. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes apache#38692 from cloud-fan/cache. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…tionRules to injectPlanNormalizationRule ### What changes were proposed in this pull request? Followup of apache#38692. To follow other APIs in `SparkSessionExtensions`, the name should be `inject...Rule` and `build...Rules`. ### Why are the changes needed? typo fix ### Does this PR introduce _any_ user-facing change? not a released API ### How was this patch tested? n/a Closes apache#38767 from cloud-fan/small. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…caching ### What changes were proposed in this pull request? Today, Spark is very conservative and uses the analyzed plan instead of the optimized plan as the cache key. Many cache opportunities are missed. This PR updates `SparkSessionExtensions` to allow people to inject custom plan normalization rules. Users can pick some safe optimizer rules, or implement new rules based on their business needs, to do plan normalization and increase the cache hit rate. ### Why are the changes needed? allow advanced users to do caching better. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes apache#38692 from cloud-fan/cache. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…tionRules to injectPlanNormalizationRule ### What changes were proposed in this pull request? Followup of apache#38692. To follow other APIs in `SparkSessionExtensions`, the name should be `inject...Rule` and `build...Rules`. ### Why are the changes needed? typo fix ### Does this PR introduce _any_ user-facing change? not a released API ### How was this patch tested? n/a Closes apache#38767 from cloud-fan/small. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Today, Spark is very conservative and uses the analyzed plan instead of the optimized plan as the cache key. Many cache opportunities are missed.
This PR updates
SparkSessionExtensionsto allow people to inject custom plan normalization rules. Users can pick some safe optimizer rules, or implement new rules based on their business needs, to do plan normalization and increase the cache hit rate.Why are the changes needed?
allow advanced users to do caching better.
Does this PR introduce any user-facing change?
no
How was this patch tested?
new test