@@ -76,25 +76,28 @@ object HandleOuterJoinBuildSideSkew extends Rule[LogicalPlan]
7676 val maxBloomFilterEntries = conf.dynamicBloomFilterJoinPruningMaxBloomFilterEntries
7777
7878 if (rightSize.max > Utils .median(rightSize, false ) * threshold) {
79- // 1. Insert bloom filter
80- val insertBF = if (left.stats.rowCount.exists(_ <= maxBloomFilterEntries)) {
81- insertPredicate(rightKeys, right, leftKeys, left)
82- } else {
83- right
84- }
85- // TODO: 2. Insert partial aggregate
86- val joinAttrs = join.condition.map(_.references.filter(canEvaluate(_, right)).toSeq)
87- .getOrElse(Nil )
88- val insertPartialAgg =
89- if (joinAttrs.nonEmpty) PartialAggregate (joinAttrs, joinAttrs, insertBF) else insertBF
79+ val (innerJoinRight, leftAntiJoinRight) =
80+ if (left.stats.rowCount.exists(_ <= maxBloomFilterEntries)) {
81+ // 1. Insert bloom filter
82+ val insertBF = insertPredicate(rightKeys, right, leftKeys, left)
83+ (insertBF, insertBF)
84+ } else {
85+ // 2. Insert partial aggregate for right side of left anti join
86+ val joinAttrs =
87+ join.condition.map(_.references.filter(canEvaluate(_, right)).toSeq.distinct)
88+ .getOrElse(Nil )
89+ val insertPartialAgg =
90+ if (joinAttrs.nonEmpty) PartialAggregate (joinAttrs, joinAttrs, right) else right
91+ (right, insertPartialAgg)
92+ }
9093
9194 // Should not convert to BHJ
9295 val joinHint = JoinHint (Some (HintInfo (strategy = Some (NO_BROADCAST_HASH ))), None )
9396 val union = Union (
94- join.copy(right = insertBF , joinType = Inner , hint = joinHint),
97+ join.copy(right = innerJoinRight , joinType = Inner , hint = joinHint),
9598 Project (left.output ++
9699 right.output.map(name => Alias (Literal (null , name.dataType), name.name)()),
97- Join (left, insertBF , LeftAnti , join.condition, join.hint)))
100+ Join (left, leftAntiJoinRight , LeftAnti , join.condition, join.hint)))
98101 union.addOptimizeTag(s " created by ${this .simpleRuleName}" )
99102 union
100103 } else {
0 commit comments