diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3273a61dc7b35..3a3ccd5ff5e60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -332,12 +332,11 @@ object LimitPushDown extends Rule[LogicalPlan] { // pushdown Limit. case LocalLimit(exp, Union(children)) => LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _)))) - // Add extra limits below OUTER JOIN. For LEFT OUTER and FULL OUTER JOIN we push limits to the - // left and right sides, respectively. For FULL OUTER JOIN, we can only push limits to one side - // because we need to ensure that rows from the limited side still have an opportunity to match - // against all candidates from the non-limited side. We also need to ensure that this limit - // pushdown rule will not eventually introduce limits on both sides if it is applied multiple - // times. Therefore: + // Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to + // the left and right sides, respectively. It's not safe to push limits below FULL OUTER + // JOIN in the general case without a more invasive rewrite. + // We also need to ensure that this limit pushdown rule will not eventually introduce limits + // on both sides if it is applied multiple times. Therefore: // - If one side is already limited, stack another limit on top if the new limit is smaller. // The redundant limit will be collapsed by the CombineLimits rule. // - If neither side is limited, limit the side that is estimated to be bigger. @@ -345,19 +344,6 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) - case FullOuter => - (left.maxRows, right.maxRows) match { - case (None, None) => - if (left.stats.sizeInBytes >= right.stats.sizeInBytes) { - join.copy(left = maybePushLocalLimit(exp, left)) - } else { - join.copy(right = maybePushLocalLimit(exp, right)) - } - case (Some(_), Some(_)) => join - case (Some(_), None) => join.copy(left = maybePushLocalLimit(exp, left)) - case (None, Some(_)) => join.copy(right = maybePushLocalLimit(exp, right)) - - } case _ => join } LocalLimit(exp, newJoin) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index f50e2e86516f0..cc98d2350c777 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -113,35 +113,34 @@ class LimitPushdownSuite extends PlanTest { test("full outer join where neither side is limited and both sides have same statistics") { assert(x.stats.sizeInBytes === y.stats.sizeInBytes) - val originalQuery = x.join(y, FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, x).join(y, FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = x.join(y, FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } test("full outer join where neither side is limited and left side has larger statistics") { val xBig = testRelation.copy(data = Seq.fill(2)(null)).subquery('x) assert(xBig.stats.sizeInBytes > y.stats.sizeInBytes) - val originalQuery = xBig.join(y, FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, xBig).join(y, FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = xBig.join(y, FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } test("full outer join where neither side is limited and right side has larger statistics") { val yBig = testRelation.copy(data = Seq.fill(2)(null)).subquery('y) assert(x.stats.sizeInBytes < yBig.stats.sizeInBytes) - val originalQuery = x.join(yBig, FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, x.join(LocalLimit(1, yBig), FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = x.join(yBig, FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } test("full outer join where both sides are limited") { - val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, Limit(2, x).join(Limit(2, y), FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } } -