From 9fc933e5e8e0d52ddf5912fc1bc4cc32f9d8b0d3 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 18 Apr 2019 16:16:00 -0700 Subject: [PATCH] [SPARK-27514] Skip collapsing windows with empty window expressions --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 1 + .../sql/catalyst/optimizer/CollapseWindowSuite.scala | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index afdf61ef322d..f32f2c7986dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -770,6 +770,7 @@ object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty && + we1.nonEmpty && we2.nonEmpty && // This assumes Window contains the same type of window expressions. This is ensured // by ExtractWindowFunctions. WindowFunctionType.functionType(we1.head) == WindowFunctionType.functionType(we2.head) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 52054c2f8bd8..3b3b4907eea8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -89,4 +89,15 @@ class CollapseWindowSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) comparePlans(optimized, expected) } + + test("Skip windows with empty window expressions") { + val query = testRelation + .window(Seq(), partitionSpec1, orderSpec1) + .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } }