Skip to content
24 changes: 16 additions & 8 deletions docs/sql-performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,22 @@ that these options will be deprecated in future release as more optimizations ar
</tr>
</table>

## Broadcast Hint for SQL Queries

The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view.
When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred,
even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
When both sides of a join are specified, Spark broadcasts the one having the lower statistics.
Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join)
support BHJ. When the broadcast nested loop join is selected, we still respect the hint.
## Join Strategy Hints for SQL Queries

The join strategy hints, namely `BROADCAST`, `MERGE`, `SHUFFLE_HASH` and `SHUFFLE_REPLICATE_NL`,
instruct Spark to use the hinted strategy on each specified relation when joining them with another
relation. For example, when the `BROADCAST` hint is used on table 't1', broadcast join (either
broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested
by the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.

When different join strategy hints are specified on both sides of a join, Spark prioritizes the
`BROADCAST` hint over the `MERGE` hint over the `SHUFFLE_HASH` hint over the `SHUFFLE_REPLICATE_NL`
hint. When both sides are specified with the `BROADCAST` hint or the `SHUFFLE_HASH` hint, Spark will
pick the build side based on the join type and the sizes of the relations.

Note that there is no guarantee that Spark will choose the join strategy specified in the hint since
a specific strategy may not support all join types.

<div class="codetabs">

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class Analyzer(

lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
new ResolveHints.ResolveJoinStrategyHints(conf),
ResolveHints.ResolveCoalesceHints,
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis

import java.util.Locale

import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -28,45 +30,66 @@ import org.apache.spark.sql.internal.SQLConf


/**
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
* Collection of rules related to hints. The only hint currently available is join strategy hint.
*
* Note that this is separately into two rules because in the future we might introduce new hint
* rules that have different ordering requirements from broadcast.
* rules that have different ordering requirements from join strategies.
*/
object ResolveHints {

/**
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
* on top of any relation (that is not aliased differently), subquery, or common table expression
* that match the specified name.
* The list of allowed join strategy hints is defined in [[JoinStrategyHint.strategies]], and a
* sequence of relation aliases can be specified with a join strategy hint, e.g., "MERGE(a, c)",
* "BROADCAST(a)". A join strategy hint plan node will be inserted on top of any relation (that
* is not aliased differently), subquery, or common table expression that match the specified
* name.
*
* The hint resolution works by recursively traversing down the query plan to find a relation or
* subquery that matches one of the specified broadcast aliases. The traversal does not go past
* beyond any existing broadcast hints, subquery aliases.
* subquery that matches one of the specified relation aliases. The traversal does not go past
* beyond any view reference, with clause or subquery alias.
*
* This rule must happen before common table expressions.
*/
class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
class ResolveJoinStrategyHints(conf: SQLConf) extends Rule[LogicalPlan] {
private val STRATEGY_HINT_NAMES = JoinStrategyHint.strategies.flatMap(_.hintAliases)

def resolver: Resolver = conf.resolver

private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
private def createHintInfo(hintName: String): HintInfo = {
HintInfo(strategy =
JoinStrategyHint.strategies.find(
_.hintAliases.map(
_.toUpperCase(Locale.ROOT)).contains(hintName.toUpperCase(Locale.ROOT))))
}

private def applyJoinStrategyHint(
plan: LogicalPlan,
relations: mutable.HashSet[String],
hintName: String): LogicalPlan = {
// Whether to continue recursing down the tree
var recurse = true

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
ResolvedHint(plan, HintInfo(broadcast = true))
case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
ResolvedHint(plan, HintInfo(broadcast = true))
case ResolvedHint(u: UnresolvedRelation, hint)
if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
case ResolvedHint(r: SubqueryAlias, hint)
if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))

case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
ResolvedHint(plan, createHintInfo(hintName))
case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(plan, createHintInfo(hintName))

case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
// Don't traverse down these nodes.
// For an existing broadcast hint, there is no point going down (if we do, we either
// won't change the structure, or will introduce another broadcast hint that is useless.
// For an existing strategy hint, there is no chance for a match from this point down.
// The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
// down. Note that technically when this rule is executed, we haven't completed view
// resolution yet and as a result the view part should be deadcode. I'm leaving it here
Expand All @@ -80,25 +103,38 @@ object ResolveHints {
}

if ((plan fastEquals newNode) && recurse) {
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
newNode.mapChildren(child => applyJoinStrategyHint(child, relations, hintName))
} else {
newNode
}
}

private def handleOverriddenHintInfo(hint: HintInfo): Unit = {
logWarning(s"Join hint $hint is overridden by another hint and will not take effect.")
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
if (h.parameters.isEmpty) {
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
ResolvedHint(h.child, HintInfo(broadcast = true))
// If there is no table alias specified, apply the hint on the entire subtree.
ResolvedHint(h.child, createHintInfo(h.name))
} else {
// Otherwise, find within the subtree query plans that should be broadcasted.
applyBroadcastHint(h.child, h.parameters.map {
// Otherwise, find within the subtree query plans to apply the hint.
val relationNames = h.parameters.map {
case tableName: String => tableName
case tableId: UnresolvedAttribute => tableId.name
case unsupported => throw new AnalysisException("Broadcast hint parameter should be " +
s"an identifier or string but was $unsupported (${unsupported.getClass}")
}.toSet)
case unsupported => throw new AnalysisException("Join strategy hint parameter " +
s"should be an identifier or string but was $unsupported (${unsupported.getClass}")
}
val relationNameSet = new mutable.HashSet[String]
relationNames.foreach(relationNameSet.add)

val applied = applyJoinStrategyHint(h.child, relationNameSet, h.name)
relationNameSet.foreach { n =>
logWarning(s"Count not find relation '$n' for join strategy hint " +
s"'${h.name}${relationNames.mkString("(", ", ", ")")}'.")
}
applied
}
}
}
Expand Down Expand Up @@ -135,7 +171,9 @@ object ResolveHints {
*/
object RemoveAllHints extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case h: UnresolvedHint => h.child
case h: UnresolvedHint =>
logWarning(s"Unrecognized hint: ${h.name}${h.parameters.mkString("(", ", ", ")")}")
h.child
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ object CatalogTable {
/**
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
* We define this new class instead of directly using [[Statistics]] here because there are no
* concepts of attributes or broadcast hint in catalog.
* concepts of attributes in catalog.
*/
case class CatalogStatistics(
sizeInBytes: BigInt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,58 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val pulledUp = plan transformUp {
case j: Join =>
val leftHint = mergeHints(collectHints(j.left))
val rightHint = mergeHints(collectHints(j.right))
j.copy(hint = JoinHint(leftHint, rightHint))
val (newLeft, leftHints) = extractHintsFromPlan(j.left)
val (newRight, rightHints) = extractHintsFromPlan(j.right)
val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
j.copy(left = newLeft, right = newRight, hint = newJoinHint)
}
pulledUp.transformUp {
case h: ResolvedHint => h.child
case h: ResolvedHint =>
handleInvalidHintInfo(h.hints)
h.child
}
}

/**
* Combine a list of [[HintInfo]]s into one [[HintInfo]].
*/
private def mergeHints(hints: Seq[HintInfo]): Option[HintInfo] = {
hints.reduceOption((h1, h2) => HintInfo(
broadcast = h1.broadcast || h2.broadcast))
hints.reduceOption((h1, h2) => h1.merge(h2, handleOverriddenHintInfo))
}

private def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
/**
* Extract all hints from the plan, returning a list of extracted hints and the transformed plan
* with [[ResolvedHint]] nodes removed. The returned hint list comes in top-down order.
* Note that hints can only be extracted from under certain nodes. Those that cannot be extracted
* in this method will be cleaned up later by this rule, and may emit warnings depending on the
* configurations.
*/
private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
plan match {
case h: ResolvedHint => collectHints(h.child) :+ h.hints
case u: UnaryNode => collectHints(u.child)
case h: ResolvedHint =>
val (plan, hints) = extractHintsFromPlan(h.child)
(plan, h.hints +: hints)
case u: UnaryNode =>
val (plan, hints) = extractHintsFromPlan(u.child)
(u.withNewChildren(Seq(plan)), hints)
// TODO revisit this logic:
// except and intersect are semi/anti-joins which won't return more data then
// their left argument, so the broadcast hint should be propagated here
case i: Intersect => collectHints(i.left)
case e: Except => collectHints(e.left)
case _ => Seq.empty
case i: Intersect =>
val (plan, hints) = extractHintsFromPlan(i.left)
(i.copy(left = plan), hints)
case e: Except =>
val (plan, hints) = extractHintsFromPlan(e.left)
(e.copy(left = plan), hints)
case p: LogicalPlan => (p, Seq.empty)
}
}

private def handleInvalidHintInfo(hint: HintInfo): Unit = {
logWarning(s"A join hint $hint is specified but it is not part of a join relation.")
}

private def handleOverriddenHintInfo(hint: HintInfo): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a little weird to see this method being defined twice. Can we just log the message inside HintInfo.merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to have a centralized handler for all kinds of hint events/errors, and the action, whether to log warnings/errors or to throw exceptions, can be configurable. WDYT?

logWarning(s"Join hint $hint is overridden by another hint and will not take effect.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,94 @@ object JoinHint {
/**
* The hint attributes to be applied on a specific node.
*
* @param broadcast If set to true, it indicates that the broadcast hash join is the preferred join
* strategy and the node with this hint is preferred to be the build side.
* @param strategy The preferred join strategy.
*/
case class HintInfo(broadcast: Boolean = false) {
case class HintInfo(strategy: Option[JoinStrategyHint] = None) {

override def toString: String = {
val hints = scala.collection.mutable.ArrayBuffer.empty[String]
if (broadcast) {
hints += "broadcast"
/**
* Combine this [[HintInfo]] with another [[HintInfo]] and return the new [[HintInfo]].
* @param other the other [[HintInfo]]
* @param hintOverriddenCallback a callback to notify if any [[HintInfo]] has been overridden
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we create a hint merging strategy framework, I think it will not be an arbitrary callback. Shall we make it simple now and leave it for future design? Then we can just log message inside this method.

* in this merge.
*
* Currently, for join strategy hints, the new [[HintInfo]] will contain the strategy in this
* [[HintInfo]] if defined, otherwise the strategy in the other [[HintInfo]]. The
* `hintOverriddenCallback` will be called if this [[HintInfo]] and the other [[HintInfo]]
* both have a strategy defined but the join strategies are different.
*/
def merge(other: HintInfo, hintOverriddenCallback: HintInfo => Unit): HintInfo = {
if (this.strategy.isDefined &&
other.strategy.isDefined &&
this.strategy.get != other.strategy.get) {
hintOverriddenCallback(other)
}

if (hints.isEmpty) "none" else hints.mkString("(", ", ", ")")
HintInfo(strategy = this.strategy.orElse(other.strategy))
}

override def toString: String = strategy.map(s => s"(strategy=$s)").getOrElse("none")
}

sealed abstract class JoinStrategyHint {

def displayName: String
def hintAliases: Set[String]

override def toString: String = displayName
}

/**
* The enumeration of join strategy hints.
*
* The hinted strategy will be used for the join with which it is associated if doable. In case
* of contradicting strategy hints specified for each side of the join, hints are prioritized as
* BROADCAST over SHUFFLE_MERGE over SHUFFLE_HASH over SHUFFLE_REPLICATE_NL.
*/
object JoinStrategyHint {

val strategies: Set[JoinStrategyHint] = Set(
BROADCAST,
SHUFFLE_MERGE,
SHUFFLE_HASH,
SHUFFLE_REPLICATE_NL)
}

/**
* The hint for broadcast hash join or broadcast nested loop join, depending on the availability of
* equi-join keys.
*/
case object BROADCAST extends JoinStrategyHint {
override def displayName: String = "broadcast"
override def hintAliases: Set[String] = Set(
"BROADCAST",
"BROADCASTJOIN",
"MAPJOIN")
}

/**
* The hint for shuffle sort merge join.
*/
case object SHUFFLE_MERGE extends JoinStrategyHint {
override def displayName: String = "merge"
override def hintAliases: Set[String] = Set(
"SHUFFLE_MERGE",
"MERGE",
"MERGEJOIN")
}

/**
* The hint for shuffle hash join.
*/
case object SHUFFLE_HASH extends JoinStrategyHint {
override def displayName: String = "shuffle_hash"
override def hintAliases: Set[String] = Set(
"SHUFFLE_HASH")
}

/**
* The hint for shuffle-and-replicate nested loop join, a.k.a. cartesian product join.
*/
case object SHUFFLE_REPLICATE_NL extends JoinStrategyHint {
override def displayName: String = "shuffle_replicate_nl"
override def hintAliases: Set[String] = Set(
"SHUFFLE_REPLICATE_NL")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hint for cartesian products is useful for users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In the default logic, broadcast-nl is prioritized over shuffle-replicate-nl (cartesian-product), so this can be used for special cases where shuffle-replicate-nl is favored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need a code comment to explain SHUFFLE_REPLICATE_NL is cartesian products.

}
Loading