From 775f59e92f6b0f0660551045be6d4194f75b236f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Aug 2016 09:57:26 -0700 Subject: [PATCH 1/2] [SPARK-16994][SQL] PushDownPredicate should not ignore limits. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 +++ .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f7aa6da0a5bd..66c8a35a2512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1212,6 +1212,9 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { case filter @ Filter(_, _: Filter) => filter // should not push predicates through sample, or will generate different results. case filter @ Filter(_, _: Sample) => filter + // should not push predicates through limit, or will generate different results. + case filter @ Filter(_, _: GlobalLimit) => filter + case filter @ Filter(_, _: LocalLimit) => filter case filter @ Filter(condition, u: UnaryNode) if u.expressions.forall(_.deterministic) => pushDownPredicate(filter, u.child) { predicate => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4fcde58833d7..1538386be00f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1730,6 +1730,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-16994: filter should not be pushed down into local limit") { + checkAnswer(spark.createDataset(1 to 100).limit(10).filter($"value" % 10 === 0).toDF(), + Row(10) :: Nil) + } + test("Struct Star Expansion") { val structDf = testData2.select("a", "b").as("record") From eea0dfe2fa11dbd22e20a940747381024c76729c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Aug 2016 21:26:41 -0700 Subject: [PATCH 2/2] Move the testcase into FilterPushdownSuite. --- .../sql/catalyst/optimizer/FilterPushdownSuite.scala | 11 +++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 ----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 596b8fcea194..113533347d99 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -150,6 +150,17 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } + test("SPARK-16994: filter should not be pushed down into local limit") { + val originalQuery = testRelation + .limit(1) + .where('a % 2 == 0) + .analyze + + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + test("filters: combines filters") { val originalQuery = testRelation .select('a) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1538386be00f..4fcde58833d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1730,11 +1730,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-16994: filter should not be pushed down into local limit") { - checkAnswer(spark.createDataset(1 to 100).limit(10).filter($"value" % 10 === 0).toDF(), - Row(10) :: Nil) - } - test("Struct Star Expansion") { val structDf = testData2.select("a", "b").as("record")