From 4f946c99c8f0851e5d8ba22b44bffb0b6af60737 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 3 Oct 2014 14:42:13 -0700 Subject: [PATCH 1/4] add shuffled hash outer join --- .../spark/sql/execution/SparkStrategies.scala | 8 + .../apache/spark/sql/execution/joins.scala | 137 ++++++++++++++++++ .../org/apache/spark/sql/JoinSuite.scala | 7 +- 3 files changed, 149 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 45687d960404..2cfad8a2cc9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -95,6 +95,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { leftKeys, rightKeys, buildSide, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right) => + execution.ShuffledHashOuterJoin( + leftKeys, rightKeys, BuildRight, LeftOuter, condition, planLater(left), planLater(right)) :: Nil + + case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, left, right) => + execution.ShuffledHashOuterJoin( + leftKeys, rightKeys, BuildLeft, RightOuter, condition, planLater(left), planLater(right)) :: Nil + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => execution.HashOuterJoin( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 2890a563bed4..f7c0be757fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -137,6 +137,143 @@ trait HashJoin { } } +/** + * :: DeveloperApi :: + * Performs an outer hash join of two child relations by first shuffling the data using the join + * keys. + */ +@DeveloperApi +case class ShuffledHashOuterJoin( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + joinType: JoinType, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryNode with HashJoin { + + override def outputPartitioning: Partitioning = joinType match { + case LeftOuter => left.outputPartitioning + case RightOuter => right.outputPartitioning + } + + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + override def output = { + joinType match { + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case x => + throw new Exception(s"HashOuterJoin should not take $x as the JoinType") + } + } + + private def buildHashTable(buildIter: Iterator[Row]) = { + val hashTable = new java.util.HashMap[Row, CompactBuffer[Row]]() + var currentRow: Row = null + while (buildIter.hasNext) { + currentRow = buildIter.next() + val rowKey = buildSideKeyGenerator(currentRow) + if (!rowKey.anyNull) { + val existingMatchList = hashTable.get(rowKey) + val matchList = if (existingMatchList == null) { + val newMatchList = new CompactBuffer[Row]() + hashTable.put(rowKey, newMatchList) + newMatchList + } else { + existingMatchList + } + matchList += currentRow.copy() + } + } + hashTable + } + + override def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { + // TODO: Use Spark's HashMap implementation. + val hashTable = buildHashTable(buildIter) + val nullRow = buildSide match { + case BuildRight => new GenericRow(right.output.length) + case BuildLeft => new GenericRow(left.output.length) + } + val boundCondition = + condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true) + + new Iterator[Row] { + private[this] var currentStreamedRow: Row = _ + private[this] var currentHashMatches: CompactBuffer[Row] = _ + private[this] var currentMatchPosition: Int = -1 + + // Mutable per row objects. + private[this] val joinRow = new JoinedRow2 + + private[this] val joinKeys = streamSideKeyGenerator() + + override final def hasNext: Boolean = + (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || + (streamIter.hasNext && fetchNext()) + + override final def next() = { + val ret = buildSide match { + case BuildRight => + if (currentMatchPosition == -1) { + joinRow(currentStreamedRow, nullRow) + } else { + currentMatchPosition += 1 + val rightRow = currentHashMatches(currentMatchPosition - 1) + val joinedRow = joinRow(currentStreamedRow, rightRow) + if (!boundCondition(joinedRow)) { + joinRow(currentStreamedRow, nullRow) + } else { + joinedRow + } + } + case BuildLeft => + if (currentMatchPosition == -1) { + joinRow(nullRow, currentStreamedRow) + } else { + currentMatchPosition += 1 + val leftRow = currentHashMatches(currentMatchPosition - 1) + val joinedRow = joinRow(leftRow, currentStreamedRow) + if (!boundCondition(joinedRow)) { + joinRow(nullRow, currentStreamedRow) + } else { + joinedRow + } + } + } + ret + } + + private final def fetchNext(): Boolean = { + currentMatchPosition = -1 + currentHashMatches = null + currentStreamedRow = streamIter.next() + if (!joinKeys(currentStreamedRow).anyNull) { + currentHashMatches = hashTable.get(joinKeys.currentValue) + } + if (currentHashMatches != null) { + currentMatchPosition = 0 + } + true + } + } + } + + def execute() = { + buildPlan.execute().zipPartitions(streamedPlan.execute()) { + (buildIter, streamIter) => joinIterators(buildIter, streamIter) + } + } + + +} + /** * :: DeveloperApi :: * Performs a hash based outer join for two child relations by shuffling the data using diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 6c7697ece8c5..a4ab7b9451d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -52,6 +52,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case j: LeftSemiJoinBNL => j case j: CartesianProduct => j case j: BroadcastNestedLoopJoin => j + case j: ShuffledHashOuterJoin => j } assert(operators.size === 1) @@ -76,11 +77,11 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { ("SELECT * FROM testData join testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData join testData2 ON key = a and key=2", classOf[ShuffledHashJoin]), ("SELECT * FROM testData join testData2 ON key = a where key=2", classOf[ShuffledHashJoin]), - ("SELECT * FROM testData left join testData2 ON key = a", classOf[HashOuterJoin]), + ("SELECT * FROM testData left join testData2 ON key = a", classOf[ShuffledHashOuterJoin]), ("SELECT * FROM testData right join testData2 ON key = a where key=2", - classOf[HashOuterJoin]), + classOf[ShuffledHashOuterJoin]), ("SELECT * FROM testData right join testData2 ON key = a and key=2", - classOf[HashOuterJoin]), + classOf[ShuffledHashOuterJoin]), ("SELECT * FROM testData full outer join testData2 ON key = a", classOf[HashOuterJoin]), ("SELECT * FROM testData join testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData join testData2 ON key = a and key=2", classOf[ShuffledHashJoin]), From 4d816dbe9ad22c8162d22783dc24e91ec01a067e Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 8 Oct 2014 17:23:08 -0700 Subject: [PATCH 2/4] style fixes --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2cfad8a2cc9b..8d31161b69ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -97,11 +97,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right) => execution.ShuffledHashOuterJoin( - leftKeys, rightKeys, BuildRight, LeftOuter, condition, planLater(left), planLater(right)) :: Nil + leftKeys, rightKeys, BuildRight, LeftOuter, + condition, planLater(left), planLater(right)) :: Nil case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, left, right) => execution.ShuffledHashOuterJoin( - leftKeys, rightKeys, BuildLeft, RightOuter, condition, planLater(left), planLater(right)) :: Nil + leftKeys, rightKeys, BuildLeft, RightOuter, + condition, planLater(left), planLater(right)) :: Nil case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => execution.HashOuterJoin( From 6aea59c8295ed66d0cf785698dc135e817cc74f6 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 14 Oct 2014 00:37:53 -0700 Subject: [PATCH 3/4] refactor --- .../spark/sql/execution/SparkStrategies.scala | 10 +- .../execution/joins/ShuffledHashJoin.scala | 105 +++++++++++++++++- .../org/apache/spark/sql/JoinSuite.scala | 7 +- 3 files changed, 109 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ca7345abdada..5d7283b02616 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -91,17 +91,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { joins.BuildLeft } val hashJoin = joins.ShuffledHashJoin( - leftKeys, rightKeys, buildSide, planLater(left), planLater(right)) + leftKeys, rightKeys, buildSide, Inner, condition, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right) => - execution.ShuffledHashOuterJoin( - leftKeys, rightKeys, BuildRight, LeftOuter, + joins.ShuffledHashJoin( + leftKeys, rightKeys, joins.BuildRight, LeftOuter, condition, planLater(left), planLater(right)) :: Nil case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, left, right) => - execution.ShuffledHashOuterJoin( - leftKeys, rightKeys, BuildLeft, RightOuter, + joins.ShuffledHashJoin( + leftKeys, rightKeys, joins.BuildLeft, RightOuter, condition, planLater(left), planLater(right)) :: Nil case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 418c1c23e554..3e33f0dbc4a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.{Inner, FullOuter, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} - +import org.apache.spark.util.collection.CompactBuffer /** * :: DeveloperApi :: * Performs an inner hash join of two child relations by first shuffling the data using the join @@ -32,19 +33,115 @@ case class ShuffledHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, + joinType: JoinType, + condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { + case Inner => left.outputPartitioning + case LeftOuter => left.outputPartitioning + case RightOuter => right.outputPartitioning + case x => throw new Exception(s"ShuffledHashJoin should not take $x as the JoinType") + } override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + override def output = { + joinType match { + case Inner => + left.output ++ right.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case x => + throw new Exception(s"ShuffledHashJoin should not take $x as the JoinType") + } + } + + private[this] lazy val nullRow = joinType match { + case LeftOuter => new GenericRow(right.output.length) + case RightOuter => new GenericRow(left.output.length) + case _ => null + } + + private[this] lazy val boundCondition = + condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true) + + private def outerJoin(streamIter: Iterator[Row], hashedRelation: HashedRelation):Iterator[Row] = { + new Iterator[Row] { + private[this] var currentStreamedRow: Row = _ + private[this] var currentHashMatches: CompactBuffer[Row] = _ + private[this] var currentMatchPosition: Int = -1 + + // Mutable per row objects. + private[this] val joinRow = new JoinedRow2 + + private[this] val joinKeys = streamSideKeyGenerator() + + override final def hasNext: Boolean = + (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || + (streamIter.hasNext && fetchNext()) + + override final def next() = { + val ret = joinType match { + case LeftOuter => + if (currentMatchPosition == -1) { + joinRow(currentStreamedRow, nullRow) + } else { + val rightRow = currentHashMatches(currentMatchPosition) + val joinedRow = joinRow(currentStreamedRow, rightRow) + currentMatchPosition += 1 + if (!boundCondition(joinedRow)) { + joinRow(currentStreamedRow, nullRow) + } else { + joinedRow + } + } + case RightOuter => + if (currentMatchPosition == -1) { + joinRow(nullRow, currentStreamedRow) + } else { + val leftRow = currentHashMatches(currentMatchPosition) + val joinedRow = joinRow(leftRow, currentStreamedRow) + currentMatchPosition += 1 + if (!boundCondition(joinedRow)) { + joinRow(nullRow, currentStreamedRow) + } else { + joinedRow + } + } + } + ret + } + + private final def fetchNext(): Boolean = { + currentMatchPosition = -1 + currentHashMatches = null + currentStreamedRow = streamIter.next() + if (!joinKeys(currentStreamedRow).anyNull) { + currentHashMatches = hashedRelation.get(joinKeys.currentValue) + } + if (currentHashMatches != null) { + currentMatchPosition = 0 + } + true + } + } + } + override def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashed = HashedRelation(buildIter, buildSideKeyGenerator) - hashJoin(streamIter, hashed) + joinType match { + case Inner => hashJoin(streamIter, hashed) + case LeftOuter => outerJoin(streamIter, hashed) + case RightOuter => outerJoin(streamIter, hashed) + case x => throw new Exception(s"ShuffledHashJoin should not take $x as the JoinType") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 79ad1700523f..a576e158bc39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -53,7 +53,6 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case j: LeftSemiJoinBNL => j case j: CartesianProduct => j case j: BroadcastNestedLoopJoin => j - case j: ShuffledHashOuterJoin => j } assert(operators.size === 1) @@ -78,11 +77,11 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { ("SELECT * FROM testData join testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData join testData2 ON key = a and key=2", classOf[ShuffledHashJoin]), ("SELECT * FROM testData join testData2 ON key = a where key=2", classOf[ShuffledHashJoin]), - ("SELECT * FROM testData left join testData2 ON key = a", classOf[ShuffledHashOuterJoin]), + ("SELECT * FROM testData left join testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData right join testData2 ON key = a where key=2", - classOf[ShuffledHashOuterJoin]), + classOf[ShuffledHashJoin]), ("SELECT * FROM testData right join testData2 ON key = a and key=2", - classOf[ShuffledHashOuterJoin]), + classOf[ShuffledHashJoin]), ("SELECT * FROM testData full outer join testData2 ON key = a", classOf[HashOuterJoin]), ("SELECT * FROM testData join testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData join testData2 ON key = a and key=2", classOf[ShuffledHashJoin]), From 91b182fe7affd8f1518c260cd3ddffe8a5fe2b0d Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 28 Oct 2014 18:08:07 -0700 Subject: [PATCH 4/4] modify joinSuite.scala --- .../src/test/scala/org/apache/spark/sql/JoinSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 8b4cf5bac018..a8a8454c45f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -75,11 +75,11 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[ShuffledHashJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[ShuffledHashJoin]), - ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[HashOuterJoin]), + ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", - classOf[HashOuterJoin]), + classOf[ShuffledHashJoin]), ("SELECT * FROM testData right join testData2 ON key = a and key = 2", - classOf[HashOuterJoin]), + classOf[ShuffledHashJoin]), ("SELECT * FROM testData full outer join testData2 ON key = a", classOf[HashOuterJoin]) // TODO add BroadcastNestedLoopJoin ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }