-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27225][SQL] Implement join strategy hints #24164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #103754 has finished for PR 24164 at commit
|
|
retest this please |
|
Test build #103759 has finished for PR 24164 at commit
|
|
retest this please |
|
ok to test |
|
test this please |
|
Test build #103776 has started for PR 24164 at commit |
|
test this please |
|
Test build #103777 has finished for PR 24164 at commit
|
|
In the conflict case, we need to implicitly resolve it? In case of complicated queries, it seems to become difficult that uses understand the hint behaviours, I think. |
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Outdated
Show resolved
Hide resolved
| case object SHUFFLE_REPLICATE_NL extends JoinStrategyHint { | ||
| override def displayName: String = "shuffle-replicate-nested-loop" | ||
| override def hintAliases: Set[String] = Set( | ||
| "SHUFFLE_REPLICATE_NL") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hint for cartesian products is useful for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. In the default logic, broadcast-nl is prioritized over shuffle-replicate-nl (cartesian-product), so this can be used for special cases where shuffle-replicate-nl is favored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might need a code comment to explain SHUFFLE_REPLICATE_NL is cartesian products.
|
We also need to update the document; |
Thank you for pointing this out! I'll work on that. |
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Show resolved
Hide resolved
|
Test build #103937 has finished for PR 24164 at commit
|
|
|
||
| // broadcast hints were not specified, so need to infer it from size and configuration. | ||
| // broadcast hints specified with no equi-join keys, use broadcast-nested-loop | ||
| case j @ logical.Join(left, right, joinType, condition, hint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove 'j @' ?
|
How about the Dataset Hint API? For example, |
|
@gatorsmile Added more tests. Please review. |
|
Test build #104259 has finished for PR 24164 at commit
|
|
retest this please |
|
Test build #104264 has finished for PR 24164 at commit
|
| * Supports both equi-joins and non-equi-joins. | ||
| * Supports only inner like joins. | ||
| * | ||
| * First, look at applicable join strategies hints: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add based on the following precedence
| } | ||
|
|
||
| override def toString: String = { | ||
| val hints = scala.collection.mutable.ArrayBuffer.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't need to create an array buffer here.
|
Test build #104401 has finished for PR 24164 at commit
|
|
Test build #104392 has finished for PR 24164 at commit
|
|
@maropu I added "hint error handling point" in the hint resolving and hint-node elimination stage, with default behavior as logging warnings. You can refine them and probably add configuration-based error handling in your other PR. |
|
Test build #104418 has finished for PR 24164 at commit
|
| logWarning(s"A join hint $hint is specified but it is not part of a join relation.") | ||
| } | ||
|
|
||
| private def handleOverriddenHintInfo(hint: HintInfo): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a little weird to see this method being defined twice. Can we just log the message inside HintInfo.merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to have a centralized handler for all kinds of hint events/errors, and the action, whether to log warnings/errors or to throw exceptions, can be configurable. WDYT?
| * in this [[HintInfo]] if defined, otherwise the strategy in the other [[HintInfo]]. | ||
| * Combine this [[HintInfo]] with another [[HintInfo]] and return the new [[HintInfo]]. | ||
| * @param other the other [[HintInfo]] | ||
| * @param hintOverriddenCallback a callback to notify if any [[HintInfo]] has been overridden |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we create a hint merging strategy framework, I think it will not be an arbitrary callback. Shall we make it simple now and leave it for future design? Then we can just log message inside this method.
|
Test build #104420 has finished for PR 24164 at commit
|
|
Test build #104457 has finished for PR 24164 at commit
|
|
Test build #104468 has finished for PR 24164 at commit
|
|
retest this please |
|
Test build #104474 has finished for PR 24164 at commit
|
|
Test build #104481 has finished for PR 24164 at commit
|
|
Test build #104500 has finished for PR 24164 at commit
|
|
Test build #104498 has finished for PR 24164 at commit
|
|
retest this please |
|
Test build #104511 has finished for PR 24164 at commit
|
|
thanks, merging to master! |
## What changes were proposed in this pull request?
This PR extends the existing BROADCAST join hint (for both broadcast-hash join and broadcast-nested-loop join) by implementing other join strategy hints corresponding to the rest of Spark's existing join strategies: shuffle-hash, sort-merge, cartesian-product. The hint names: SHUFFLE_MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL are partly different from the code names in order to make them clearer to users and reflect the actual algorithms better.
The hinted strategy will be used for the join with which it is associated if it is applicable/doable.
Conflict resolving rules in case of multiple hints:
1. Conflicts within either side of the join: take the first strategy hint specified in the query, or the top hint node in Dataset. For example, in "select /*+ merge(t1) */ /*+ broadcast(t1) */ k1, v2 from t1 join t2 on t1.k1 = t2.k2", take "merge(t1)"; in ```df1.hint("merge").hint("shuffle_hash").join(df2)```, take "shuffle_hash". This is a general hint conflict resolving strategy, not specific to join strategy hint.
2. Conflicts between two sides of the join:
a) In case of different strategy hints, hints are prioritized as ```BROADCAST``` over ```SHUFFLE_MERGE``` over ```SHUFFLE_HASH``` over ```SHUFFLE_REPLICATE_NL```.
b) In case of same strategy hints but conflicts in build side, choose the build side based on join type and size.
## How was this patch tested?
Added new UTs.
Closes apache#24164 from maryannxue/join-hints.
Lead-authored-by: maryannxue <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR extends the existing BROADCAST join hint (for both broadcast-hash join and broadcast-nested-loop join) by implementing other join strategy hints corresponding to the rest of Spark's existing join strategies: shuffle-hash, sort-merge, cartesian-product. The hint names: SHUFFLE_MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL are partly different from the code names in order to make them clearer to users and reflect the actual algorithms better.
The hinted strategy will be used for the join with which it is associated if it is applicable/doable.
Conflict resolving rules in case of multiple hints:
df1.hint("merge").hint("shuffle_hash").join(df2), take "shuffle_hash". This is a general hint conflict resolving strategy, not specific to join strategy hint.a) In case of different strategy hints, hints are prioritized as
BROADCASToverSHUFFLE_MERGEoverSHUFFLE_HASHoverSHUFFLE_REPLICATE_NL.b) In case of same strategy hints but conflicts in build side, choose the build side based on join type and size.
How was this patch tested?
Added new UTs.