Skip to content

Commit 86ff8b1

Browse files
marmbrusrxin
authored andcommitted
Generalize pattern for planning hash joins.
This will be helpful for [SPARK-1495](https://issues.apache.org/jira/browse/SPARK-1495) and other cases where we want to have custom hash join implementations but don't want to repeat the logic for finding the join keys. Author: Michael Armbrust <[email protected]> Closes #418 from marmbrus/hashFilter and squashes the following commits: d5cc79b [Michael Armbrust] Address @rxin 's comments. 366b6d9 [Michael Armbrust] style fixes 14560eb [Michael Armbrust] Generalize pattern for planning hash joins. f4809c1 [Michael Armbrust] Move common functions to PredicateHelper.
1 parent cd12dd9 commit 86ff8b1

File tree

3 files changed

+82
-48
lines changed

3 files changed

+82
-48
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import org.apache.spark.sql.catalyst.trees
21-
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2220
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
23-
import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType}
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
import org.apache.spark.sql.catalyst.trees
23+
import org.apache.spark.sql.catalyst.types.BooleanType
24+
2425

2526
object InterpretedPredicate {
2627
def apply(expression: Expression): (Row => Boolean) = {
@@ -37,10 +38,26 @@ trait Predicate extends Expression {
3738
}
3839

3940
trait PredicateHelper {
40-
def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
41-
case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
42-
case other => other :: Nil
41+
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
42+
condition match {
43+
case And(cond1, cond2) =>
44+
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
45+
case other => other :: Nil
46+
}
4347
}
48+
49+
/**
50+
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
51+
* can be used to determine when is is acceptable to move expression evaluation within a query
52+
* plan.
53+
*
54+
* For example consider a join between two relations R(a, b) and S(c, d).
55+
*
56+
* `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns
57+
* `false`.
58+
*/
59+
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
60+
expr.references.subsetOf(plan.outputSet)
4461
}
4562

4663
abstract class BinaryPredicate extends BinaryExpression with Predicate {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning
1919

2020
import scala.annotation.tailrec
2121

22+
import org.apache.spark.sql.Logging
23+
2224
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.catalyst.plans._
2326
import org.apache.spark.sql.catalyst.plans.logical._
2427

2528
/**
@@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper {
101104
}
102105
}
103106

107+
/**
108+
* A pattern that finds joins with equality conditions that can be evaluated using hashing
109+
* techniques. For inner joins, any filters on top of the join operator are also matched.
110+
*/
111+
object HashFilteredJoin extends Logging with PredicateHelper {
112+
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
113+
type ReturnType =
114+
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
115+
116+
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
117+
// All predicates can be evaluated for inner join (i.e., those that are in the ON
118+
// clause and WHERE clause.)
119+
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
120+
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
121+
splitPredicates(predicates ++ condition, join)
122+
case join @ Join(left, right, joinType, condition) =>
123+
logger.debug(s"Considering hash join on: $condition")
124+
splitPredicates(condition.toSeq, join)
125+
case _ => None
126+
}
127+
128+
// Find equi-join predicates that can be evaluated before the join, and thus can be used
129+
// as join keys.
130+
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
131+
val Join(left, right, joinType, _) = join
132+
val (joinPredicates, otherPredicates) = allPredicates.partition {
133+
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
134+
(canEvaluate(l, right) && canEvaluate(r, left)) => true
135+
case _ => false
136+
}
137+
138+
val joinKeys = joinPredicates.map {
139+
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
140+
case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
141+
}
142+
143+
// Do not consider this strategy if there are no join keys.
144+
if (joinKeys.nonEmpty) {
145+
val leftKeys = joinKeys.map(_._1)
146+
val rightKeys = joinKeys.map(_._2)
147+
148+
Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
149+
} else {
150+
logger.debug(s"Avoiding hash join with no join keys.")
151+
None
152+
}
153+
}
154+
}
155+
104156
/**
105157
* A pattern that collects all adjacent unions and returns their children as a Seq.
106158
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._
2828
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
2929
self: SQLContext#SparkPlanner =>
3030

31-
object HashJoin extends Strategy {
31+
object HashJoin extends Strategy with PredicateHelper {
3232
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
33-
case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) =>
34-
logger.debug(s"Considering join: ${predicates ++ condition}")
35-
// Find equi-join predicates that can be evaluated before the join, and thus can be used
36-
// as join keys. Note we can only mix in the conditions with other predicates because the
37-
// match above ensures that this is and Inner join.
38-
val (joinPredicates, otherPredicates) = (predicates ++ condition).partition {
39-
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
40-
(canEvaluate(l, right) && canEvaluate(r, left)) => true
41-
case _ => false
42-
}
43-
44-
val joinKeys = joinPredicates.map {
45-
case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
46-
case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
47-
}
48-
49-
// Do not consider this strategy if there are no join keys.
50-
if (joinKeys.nonEmpty) {
51-
val leftKeys = joinKeys.map(_._1)
52-
val rightKeys = joinKeys.map(_._2)
53-
54-
val joinOp = execution.HashJoin(
55-
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
56-
57-
// Make sure other conditions are met if present.
58-
if (otherPredicates.nonEmpty) {
59-
execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil
60-
} else {
61-
joinOp :: Nil
62-
}
63-
} else {
64-
logger.debug(s"Avoiding spark join with no join keys.")
65-
Nil
66-
}
33+
// Find inner joins where at least some predicates can be evaluated by matching hash keys
34+
// using the HashFilteredJoin pattern.
35+
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
36+
val hashJoin =
37+
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
38+
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
6739
case _ => Nil
6840
}
69-
70-
private def combineConjunctivePredicates(predicates: Seq[Expression]) =
71-
predicates.reduceLeft(And)
72-
73-
/** Returns true if `expr` can be evaluated using only the output of `plan`. */
74-
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
75-
expr.references subsetOf plan.outputSet
7641
}
7742

7843
object PartialAggregation extends Strategy {

0 commit comments

Comments
 (0)