Skip to content

Commit 1a8da2a

Browse files
committed
add BroadcastLeftSemiJoinHash
1 parent 0e532cc commit 1a8da2a

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.joins
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.sql.catalyst.expressions.{Expression, Row}
22+
import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
23+
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
24+
25+
/**
26+
* :: DeveloperApi ::
27+
* Build the right table's join keys into a HashSet, and iteratively go through the left
28+
* table, to find the if join keys are in the Hash set.
29+
*/
30+
@DeveloperApi
31+
case class BroadcastLeftSemiJoinHash(
32+
leftKeys: Seq[Expression],
33+
rightKeys: Seq[Expression],
34+
left: SparkPlan,
35+
right: SparkPlan) extends BinaryNode with HashJoin {
36+
37+
override val buildSide = BuildRight
38+
39+
override def output = left.output
40+
41+
override def execute() = {
42+
43+
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
44+
val hashSet = new java.util.HashSet[Row]()
45+
var currentRow: Row = null
46+
47+
// Create a Hash set of buildKeys
48+
while (buildIter.hasNext) {
49+
currentRow = buildIter.next()
50+
val rowKey = buildSideKeyGenerator(currentRow)
51+
if (!rowKey.anyNull) {
52+
val keyExists = hashSet.contains(rowKey)
53+
if (!keyExists) {
54+
hashSet.add(rowKey)
55+
}
56+
}
57+
}
58+
59+
val broadcastedRelation = sparkContext.broadcast(hashSet)
60+
61+
streamedPlan.execute().mapPartitions { streamIter =>
62+
63+
val joinKeys = streamSideKeyGenerator()
64+
streamIter.filter(current => {
65+
!joinKeys(current).anyNull && broadcastedRelation.value.contains(joinKeys.currentValue)
66+
})
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)