Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,11 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
* Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details
*/
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
// split the condition expression into 3 parts,
// (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide)
/**
* Splits join condition expressions into three categories based on the attributes required
* to evaluate them.
* @returns (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
val (leftEvaluateCondition, rest) =
condition.partition(_.references subsetOf left.outputSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,57 +105,39 @@ object PhysicalOperation extends PredicateHelper {
}

/**
* A pattern that finds joins with equality conditions that can be evaluated using hashing
* techniques. For inner joins, any filters on top of the join operator are also matched.
* A pattern that finds joins with equality conditions that can be evaluated using equi-join.
*/
object HashFilteredJoin extends Logging with PredicateHelper {
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
// All predicates can be evaluated for inner join (i.e., those that are in the ON
// clause and WHERE clause.)
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
// All predicates can be evaluated for left semi join (those that are in the WHERE
// clause can only from left table, so they can all be pushed down.)
case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) =>
logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
case join @ Join(left, right, joinType, condition) =>
logger.debug(s"Considering hash join on: $condition")
splitPredicates(condition.toSeq, join)
case _ => None
}

// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) =
allPredicates.flatMap(splitConjunctivePredicates).partition {
case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
logger.debug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val (joinPredicates, otherPredicates) =
condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}

val joinKeys = joinPredicates.map {
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

val joinKeys = joinPredicates.map {
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
logger.debug(s"Avoiding hash join with no join keys.")
None
}
if (joinKeys.nonEmpty) {
logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}")
Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
None
}
case _ => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find left semi joins where at least some predicates can be evaluated by matching hash
// keys using the HashFilteredJoin pattern.
case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
val semiJoin = execution.LeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
Expand All @@ -46,7 +45,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

/**
* Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be
* Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
*/
object HashJoin extends Strategy with PredicateHelper {
Expand All @@ -65,7 +64,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case HashFilteredJoin(
case ExtractEquiJoinKeys(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe annotation on line 48 can be modified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

Inner,
leftKeys,
rightKeys,
Expand All @@ -75,7 +74,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case HashFilteredJoin(
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
Expand All @@ -85,7 +84,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.ShuffledHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
Expand Down