-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27342][SQL] Optimize Limit 0 queries #24271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
df05284
84f944c
d5fed71
5dd33f8
ce3e936
1936ec6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -167,6 +167,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) | |
| // since the other rules might make two separate Unions operators adjacent. | ||
| Batch("Union", Once, | ||
| CombineUnions) :: | ||
| Batch("OptimizeLimitZero", Once, | ||
| OptimizeLimitZero) :: | ||
| // Run this once earlier. This might simplify the plan and reduce cost of optimizer. | ||
| // For example, a query such as Filter(LocalRelation) would go through all the heavy | ||
| // optimizer rules that are triggered when there is a filter | ||
|
|
@@ -1711,3 +1713,37 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local Relation, as they don't | ||
| * return any rows. | ||
| */ | ||
| object OptimizeLimitZero extends Rule[LogicalPlan] { | ||
| // returns empty Local Relation corresponding to given plan | ||
| private def empty(plan: LogicalPlan) = | ||
| LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
| // Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0). | ||
| // Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is | ||
| // semantically equivalent to an empty relation. | ||
| // | ||
| // In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing | ||
| // the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree | ||
| // below and triggering other optimization rules of PropagateEmptyRelation to propagate the | ||
| // changes up the Logical Plan. | ||
| // | ||
| // Replace Global Limit 0 nodes with empty Local Relation | ||
| case gl @ GlobalLimit(IntegerLiteral(limit), _) if limit == 0 => | ||
|
||
| empty(gl) | ||
|
|
||
| // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a | ||
| // GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle | ||
| // almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node | ||
| // then the following rule will handle that case as well. | ||
| // | ||
| // Replace Local Limit 0 nodes with empty Local Relation | ||
| case ll @ LocalLimit(IntegerLiteral(limit), _) if limit == 0 => | ||
|
||
| empty(ll) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.optimizer | ||
|
|
||
| import org.apache.spark.sql.Row | ||
| import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
| import org.apache.spark.sql.catalyst.dsl.plans._ | ||
| import org.apache.spark.sql.catalyst.expressions.Literal | ||
| import org.apache.spark.sql.catalyst.plans._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} | ||
| import org.apache.spark.sql.catalyst.rules.RuleExecutor | ||
| import org.apache.spark.sql.types.IntegerType | ||
|
|
||
| // Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios | ||
| class OptimizeLimitZeroSuite extends PlanTest { | ||
| object Optimize extends RuleExecutor[LogicalPlan] { | ||
| val batches = | ||
| Batch("OptimizeLimitZero", Once, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can have two batches here, just the same as
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gengliangwang Is there a specific reason for having it that way? I modelled the test suite based on |
||
| ReplaceIntersectWithSemiJoin, | ||
| OptimizeLimitZero, | ||
| PropagateEmptyRelation) :: Nil | ||
| } | ||
|
|
||
| val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) | ||
| val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1))) | ||
|
|
||
| test("Limit 0: return empty local relation") { | ||
| val query = testRelation1.limit(0) | ||
|
|
||
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = LocalRelation('a.int) | ||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| test("Limit 0: individual LocalLimit 0 node") { | ||
| val query = LocalLimit(0, testRelation1) | ||
|
|
||
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = LocalRelation('a.int) | ||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| test("Limit 0: individual GlobalLimit 0 node") { | ||
| val query = GlobalLimit(0, testRelation1) | ||
|
|
||
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = LocalRelation('a.int) | ||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| Seq( | ||
| (Inner, LocalRelation('a.int, 'b.int)), | ||
| (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze), | ||
| (RightOuter, LocalRelation('a.int, 'b.int)), | ||
| (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze) | ||
| ).foreach { case (jt, answer) => | ||
| test(s"Limit 0: for join type $jt") { | ||
| val query = testRelation1 | ||
| .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) | ||
|
|
||
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = answer | ||
|
||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
|
|
||
| test("Limit 0: 3-way join") { | ||
| val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1))) | ||
|
|
||
| val subJoinQuery = testRelation1 | ||
| .join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr)) | ||
| val query = subJoinQuery | ||
| .join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr)) | ||
|
|
||
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = LocalRelation('a.int, 'b.int, 'c.int) | ||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| test("Limit 0: intersect") { | ||
| val query = testRelation1 | ||
| .intersect(testRelation1.limit(0), isAll = false) | ||
|
|
||
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = Distinct(LocalRelation('a.int)) | ||
|
|
||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix the alignment here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed