-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32808][SQL] Pass all test of sql/core module in Scala 2.13 #29711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @srowen , this patch fixed remaining failed cases of sql core module with Scala 2.13 and after this patch all test passed now. |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like @cloud-fan or @gatorsmile to review just because it changes so many of the expected results.
| // SPARK-32687: Change to use `LinkedHashMap` to make sure that items are | ||
| // inserted and iterated in the same order. | ||
| val ret = new mutable.LinkedHashMap[Set[Int], JoinPlan]() | ||
| idToJoinPlanSeq.foreach(v => ret.put(v._1, v._2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just construct this above, with a .foreach instead of .map? or just call .addAll on the result of the .map.
I guess LinkedHashMap takes more memory, but that shouldn't matter much here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it ~ Address 10d4953 fix this.
|
|
||
| /** Map[set of item ids, join plan for these items] */ | ||
| type JoinPlanMap = Map[Set[Int], JoinPlan] | ||
| type JoinPlanMap = mutable.LinkedHashMap[Set[Int], JoinPlan] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is OK, though it's not strictly necessary to make the type def this specific
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to use LinkedHashMap instead of Map to store foundPlans in JoinReorderDP.search method to ensure same iteration order with same insert order because iteration order of Map behave differently under Scala 2.12 and 2.13
Can we make it as a separate PR? The plan changes need more reviews
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to use Map[Set[Int], JoinPlan] or not to define JoinPlanMap type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Use a separate JIRA number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same JIRA I think, just a separate PR. I'm not sure it matters that much though, it's a tiny part of the change really? we just need more eyes on the plan changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same JIRA I think, just a separate PR. I'm not sure it matters that much though, it's a tiny part of the change really? we just need more eyes on the plan changes.
It seems that there is no way to divide 2 PR because if we change CostBasedJoinReorder only, the test cases in sql/core module will be failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So all excepted plan changes are due to use of LinkedHashMap instead of Map
|
Test build #128507 has finished for PR 29711 at commit
|
|
@gatorsmile @srowen @cloud-fan already make this as a separate PR SPARK-32848, #29717 |
|
Test build #128530 has finished for PR 29711 at commit
|
| : : : +- * Filter (8) | ||
| : : : +- * ColumnarToRow (7) | ||
| : : : +- Scan parquet default.store_sales (6) | ||
| : : : +- * BroadcastHashJoin Inner BuildRight (9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm skimming the changes, and this seems non-trivial? but then again I am not so sure how to read these plans expertly enough to evaluate. The plan below starts by scanning a different table, for example.
We may just have to accept changes like this, if they're equivalent, to make sure they do not depend on implementation details of hash maps. But @gatorsmile @cloud-fan et al do these look like plausible equivalent plans for example?
| Exchange [ss_customer_sk] #2 | ||
| WholeStageCodegen (4) | ||
| Project [i_brand_id,i_brand,i_manufact_id,i_manufact,ss_customer_sk,ss_ext_sales_price,s_zip] | ||
| Project [ss_customer_sk,ss_ext_sales_price,i_brand_id,i_brand,i_manufact_id,i_manufact,s_zip] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this kind of thing possibly affect the order of columns from a select * or is that accounted for elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
Lines 298 to 308 in 99384d1
| val newJoin = Join(left, right, Inner, joinConds.reduceOption(And), JoinHint.NONE) | |
| val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds | |
| val remainingConds = conditions -- collectedJoinConds | |
| val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput | |
| val neededFromNewJoin = newJoin.output.filter(neededAttr.contains) | |
| val newPlan = | |
| if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) { | |
| Project(neededFromNewJoin, newJoin) | |
| } else { | |
| newJoin | |
| } |
Project node add by above code part in JoinReorderDP#buildJoin method, I think the Project output order decided by join order
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is safe, as it just switches to LinkedHashMap. I looked at a few plan changes and they are indeed no-op, e.g. change from "A join B" to "B join A" without changing the build side.
|
@srowen @gatorsmile Is there any other problem in this pr that needs to be fixed? It seems that @cloud-fan thinks the change is safe. |
|
I'll merge this tomorrow if there are no more objections. |
|
Merged to master |
|
thx~ @srowen @cloud-fan @gatorsmile |
What changes were proposed in this pull request?
After #29660 and #29689 there are 13 remaining failed cases of sql core module with Scala 2.13.
The reason for the remaining failed cases is the optimization result of
CostBasedJoinReordermaybe different with same input in Scala 2.12 and Scala 2.13 if there are more than one same cost candidate plans.In this pr give a way to make the optimization result deterministic as much as possible to pass all remaining failed cases of
sql/coremodule in Scala 2.13, the main change of this pr as follow:Change to use
LinkedHashMapinstead ofMapto storefoundPlansinJoinReorderDP.searchmethod to ensure same iteration order with same insert order because iteration order ofMapbehave differently under Scala 2.12 and 2.13Fixed
StarJoinCostBasedReorderSuiteaffected by the above changeRegenerate golden files affected by the above change.
Why are the changes needed?
We need to support a Scala 2.13 build.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Scala 2.12: Pass the Jenkins or GitHub Action
Scala 2.13: All tests passed.
Do the following:
Before
After