-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32430][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage preparation #29224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
@cloud-fan this is fixing another issue with AQE (#29134) and overriding would your PR cover something like this as well? |
|
Test build #126503 has finished for PR 29224 at commit
|
| case class MyNewQueryStageRule() extends Rule[SparkPlan] { | ||
| override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { | ||
| case plan if !plan.isInstanceOf[AdaptiveSparkPlanExec] => | ||
| assert(plan.getTagValue(QueryPrepRuleHelper.myPrepTag).get == QueryPrepRuleHelper.myPrepTagValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to wrap this line
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good, pending Jenkins. Would be great for others to take a look.
|
Thank you, @andygrove and @tgravescs . I'll take a look, too. |
| type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface | ||
| type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder) | ||
| type ColumnarRuleBuilder = SparkSession => ColumnarRule | ||
| type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the document (at line 47) accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I pushed an update for this.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the PR content, this is introducing new API and is a kind of improvement. Can we consider SPARK-32430 as a Improvement instead of Bug? Or, do you think this should be in branch-3.0, @andygrove and @tgravescs ?
|
thanks for looking @dongjoon-hyun, I went back and forth on the improvement vs bug. I can see it both ways, I decided to file it as a bug because without it we can't properly override AQE as intended with the original change. It would be nice to pull these back to branch-3.0 as well since many people are starting to use AQE there and was thinking the change was pretty isolated to that code path. |
|
Got it. Thank you for explanation, @tgravescs . cc @cloud-fan and @gatorsmile since this aims to land on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @andygrove and @tgravescs .
GitHub Action passed. Merged to master/branch-3.0.
… AQE query stage preparation ### What changes were proposed in this pull request? Provide a generic mechanism for plugins to inject rules into the AQE "query prep" stage that happens before query stage creation. This goes along with https://issues.apache.org/jira/browse/SPARK-32332 where the current AQE implementation doesn't allow for users to properly extend it for columnar processing. ### Why are the changes needed? The issue here is that we create new query stages but we do not have access to the parent plan of the new query stage so certain things can not be determined because you have to know what the parent did. With this change it would allow you to add TAGs to be able to figure out what is going on. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new unit test is included in the PR. Closes #29224 from andygrove/insert-aqe-rule. Authored-by: Andy Grove <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 64a01c0) Signed-off-by: Dongjoon Hyun <[email protected]>
|
Test build #126504 has finished for PR 29224 at commit
|
|
Test build #126505 has finished for PR 29224 at commit
|
|
late LGTM. cc @maryannxue @JkSelf as well. |
| // A list of physical plan rules to be applied before creation of query stages. The physical | ||
| // plan should reach a final status of query stages (i.e., no more addition or removal of | ||
| // Exchange nodes) after running these rules. | ||
| private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like this function should be moved out of the physical node AdaptiveSparkPlanExec? cc @maryannxue @cloud-fan
| createClone: (SparkSession, SessionState) => SessionState, | ||
| val columnarRules: Seq[ColumnarRule]) { | ||
| val columnarRules: Seq[ColumnarRule], | ||
| val queryStagePrepRules: Seq[Rule[SparkPlan]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SessionState is a critical concept/class in Spark SQL. Adding queryStagePrepRules into SessionState looks weird to me. WDYT?
cc @hvanhovell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preferably both columnarRules and queryStagePrepRules should be passed directly to the thing that is using them. This is not entirely easy because of how we deal with post-planning rules in QueryExecution. It would be great if someone could move those into the builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's very hard. The extension rules are injected in the BaseSessionStateBuilder, and we need to carry it in SessionState to propagate it further. The custom columnarRules and queryStagePrepRules are no different from custom analyzer/optimizer/planner rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just refactoring right (famous last words)? You pull out the rules from query execution, put them in the session state builder, and manipulate them there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I missed something. The session state builder is used to build SessionState. SparkSession or QueryExecition doesn't hold session state builder instance anymore.
Are you suggesting we create something like Analyzer, Optimizer, etc. to wrap the post-planning rules in BaseSessionStateBuilder, so that we just pass that wrapper around?
| type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface | ||
| type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder) | ||
| type ColumnarRuleBuilder = SparkSession => ColumnarRule | ||
| type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a public API. I think we also need to add a version information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is marked as unstable and experimental. Do we need to add version information in that case? I am fine with doing so, but then we should add it everywhere.
| } | ||
|
|
||
| /** | ||
| * Inject a rule that can override the the query stage preparation phase of adaptive query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the the => the
| private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( | ||
| ensureRequirements | ||
| ) | ||
| ) ++ context.session.sessionState.queryStagePrepRules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this after ensure requirements? The queryStagePrepRules might break requirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The custom rules may need to see exchange nodes as well. We can do validation at the end to make sure exchange requirements are still required.
What changes were proposed in this pull request?
Provide a generic mechanism for plugins to inject rules into the AQE "query prep" stage that happens before query stage creation.
This goes along with https://issues.apache.org/jira/browse/SPARK-32332 where the current AQE implementation doesn't allow for users to properly extend it for columnar processing.
Why are the changes needed?
The issue here is that we create new query stages but we do not have access to the parent plan of the new query stage so certain things can not be determined because you have to know what the parent did. With this change it would allow you to add TAGs to be able to figure out what is going on.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
A new unit test is included in the PR.