From f444b2cf0f2a41149a2d052c34e8a5058d257c53 Mon Sep 17 00:00:00 2001 From: ptkool Date: Mon, 8 May 2017 06:33:13 -0400 Subject: [PATCH 01/10] Add new optimization rule to flip adjacent Window expressions. --- .../sql/catalyst/optimizer/Optimizer.scala | 14 +++ .../catalyst/optimizer/FlipWindowSuite.scala | 97 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d82af94dbffb..38307edbaac2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -88,6 +88,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) CollapseRepartition, CollapseProject, CollapseWindow, + FlipWindow, CombineFilters, CombineLimits, CombineUnions, @@ -624,6 +625,19 @@ object CollapseWindow extends Rule[LogicalPlan] { } } +/** + * Flip Adjacent Window Expressions. + * - If the partition spec of the parent Window expression is a subset of the partition spec + * of the child window expression, flip them. + */ +object FlipWindow extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) + if ps2.containsSlice(ps1) => + Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild)) + } +} + /** * Generate a list of additional filters from an operator's existing constraint but remove those * that are either already part of the operator's condition or are part of the operator's child diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala new file mode 100644 index 000000000000..3e8fe481251b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{RowFrame, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, UnspecifiedFrame} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + + +class FlipWindowSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("CollapseProject", FixedPoint(100), CollapseProject, RemoveRedundantProject) :: + Batch("FlipWindow", Once, CollapseWindow, FlipWindow) :: Nil + } + + val testRelation = LocalRelation('a.string, 'b.string, 'c.int, 'd.string) + + val a = testRelation.output(0) + val b = testRelation.output(1) + val c = testRelation.output(2) + val d = testRelation.output(3) + + val partitionSpec1 = Seq(a) + val partitionSpec2 = Seq(a, b) + val partitionSpec3 = Seq(d) + + val orderSpec1 = Seq(d.asc) + val orderSpec2 = Seq(d.desc) + + test("flip two adjacent windows with compatible partitions in multiple selects") { + val query = testRelation + .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2)) + .select('a, 'b, 'c, 'sum_a_2, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)) + + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + val query2 = testRelation + .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2), + windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1) + ) + + val correctAnswer = Optimize.execute(query2.analyze) + + comparePlans(optimized, correctAnswer) + } + + test("flip two adjacent windows with compatible partitions") { + val query = testRelation + .window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2) + .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1) + + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + val correctAnswer = testRelation + .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1) + .window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2) + + comparePlans(optimized, correctAnswer.analyze) + } + + test("don't flip two adjacent windows with incompatible partitions") { + val query = testRelation + .window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty) + .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty) + + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + val correctAnswer = testRelation + .window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty) + .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty) + + comparePlans(optimized, correctAnswer.analyze) + } + +} From 6f8c036bb1c009f868bbce55cf4665ea40eb88bf Mon Sep 17 00:00:00 2001 From: ptkool Date: Tue, 9 May 2017 08:36:48 -0400 Subject: [PATCH 02/10] Changes based on feedback. --- .../sql/catalyst/optimizer/Optimizer.scala | 10 ++++----- ...Suite.scala => TransposeWindowSuite.scala} | 15 ++++++------- .../sql/DataFrameWindowFunctionsSuite.scala | 21 +++++++++++++++++++ 3 files changed, 34 insertions(+), 12 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/{FlipWindowSuite.scala => TransposeWindowSuite.scala} (86%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 38307edbaac2..ae43e58d20ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -88,7 +88,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) CollapseRepartition, CollapseProject, CollapseWindow, - FlipWindow, + TransposeWindow, CombineFilters, CombineLimits, CombineUnions, @@ -626,15 +626,15 @@ object CollapseWindow extends Rule[LogicalPlan] { } /** - * Flip Adjacent Window Expressions. + * Transpose Adjacent Window Expressions. * - If the partition spec of the parent Window expression is a subset of the partition spec - * of the child window expression, flip them. + * of the child window expression, transpose them. */ -object FlipWindow extends Rule[LogicalPlan] { +object TransposeWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) if ps2.containsSlice(ps1) => - Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild)) + Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala similarity index 86% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 3e8fe481251b..95ede99d7414 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FlipWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -25,11 +25,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -class FlipWindowSuite extends PlanTest { +class TransposeWindowSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("CollapseProject", FixedPoint(100), CollapseProject, RemoveRedundantProject) :: - Batch("FlipWindow", Once, CollapseWindow, FlipWindow) :: Nil + Batch("FlipWindow", Once, CollapseWindow, TransposeWindow) :: Nil } val testRelation = LocalRelation('a.string, 'b.string, 'c.int, 'd.string) @@ -51,13 +51,13 @@ class FlipWindowSuite extends PlanTest { .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2)) .select('a, 'b, 'c, 'sum_a_2, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)) - val analyzed = query.analyze - val optimized = Optimize.execute(analyzed) + val optimized = Optimize.execute(query.analyze) val query2 = testRelation - .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2), - windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1) - ) + .select('a, 'b, 'c) + .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)) + .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2), 'sum_a_1) + .select('a, 'b, 'c, 'sum_a_2, 'sum_a_1) val correctAnswer = Optimize.execute(query2.analyze) @@ -75,6 +75,7 @@ class FlipWindowSuite extends PlanTest { val correctAnswer = testRelation .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1) .window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2) + .select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1) comparePlans(optimized, correctAnswer.analyze) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 204858fa2978..573551390734 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -468,4 +468,25 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { spark.read.schema(sampleSchema).json(input.toDS()).select(c0, c1).foreach { _ => () } } } + + test("window functions in multiple selects") { + val df = Seq( + ("S1", "P1", 100), + ("S1", "P1", 700), + ("S2", "P1", 200), + ("S2", "P2", 300) + ).toDF("sno", "pno", "qty") + + val w1 = Window.partitionBy("sno") + val w2 = Window.partitionBy("sno", "pno") + + checkAnswer( + df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2")) + .select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1")), + Seq( + Row("S1", "P1", 100, 800, 800), + Row("S1", "P1", 700, 800, 800), + Row("S2", "P1", 200, 200, 500), + Row("S2", "P2", 300, 300, 500))) + } } From b7793cdf28283ff91bbcba9221a851930eec87b7 Mon Sep 17 00:00:00 2001 From: ptkool Date: Tue, 9 May 2017 08:43:58 -0400 Subject: [PATCH 03/10] Resolve Scala style errors. --- .../sql/catalyst/optimizer/TransposeWindowSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 95ede99d7414..61a98ff9e80c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -47,16 +47,19 @@ class TransposeWindowSuite extends PlanTest { val orderSpec2 = Seq(d.desc) test("flip two adjacent windows with compatible partitions in multiple selects") { + val wexpr1 = windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)) + val wexpr2 = windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)) + val query = testRelation - .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2)) - .select('a, 'b, 'c, 'sum_a_2, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)) + .select('a, 'b, 'c, wexpr1.as('sum_a_2)) + .select('a, 'b, 'c, 'sum_a_2, wexpr2.as('sum_a_1)) val optimized = Optimize.execute(query.analyze) val query2 = testRelation .select('a, 'b, 'c) - .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)) - .select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2), 'sum_a_1) + .select('a, 'b, 'c, wexpr2.as('sum_a_1)) + .select('a, 'b, 'c, wexpr1.as('sum_a_2), 'sum_a_1) .select('a, 'b, 'c, 'sum_a_2, 'sum_a_1) val correctAnswer = Optimize.execute(query2.analyze) From f183b780e888a02bbcf024a7bd9dc8031e38282b Mon Sep 17 00:00:00 2001 From: ptkool Date: Tue, 9 May 2017 08:55:01 -0400 Subject: [PATCH 04/10] Resolve more Scala style issues. --- .../spark/sql/catalyst/optimizer/TransposeWindowSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 61a98ff9e80c..3a7b4afe6eac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -51,15 +51,15 @@ class TransposeWindowSuite extends PlanTest { val wexpr2 = windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)) val query = testRelation - .select('a, 'b, 'c, wexpr1.as('sum_a_2)) + .select('a, 'b, 'c, wexpr1.as('sum_a_2)) .select('a, 'b, 'c, 'sum_a_2, wexpr2.as('sum_a_1)) val optimized = Optimize.execute(query.analyze) val query2 = testRelation .select('a, 'b, 'c) - .select('a, 'b, 'c, wexpr2.as('sum_a_1)) - .select('a, 'b, 'c, wexpr1.as('sum_a_2), 'sum_a_1) + .select('a, 'b, 'c, wexpr2.as('sum_a_1)) + .select('a, 'b, 'c, wexpr1.as('sum_a_2), 'sum_a_1) .select('a, 'b, 'c, 'sum_a_2, 'sum_a_1) val correctAnswer = Optimize.execute(query2.analyze) From 1f21841b0b826e9fca357b8c8b88d4f6ff4e4269 Mon Sep 17 00:00:00 2001 From: ptkool Date: Tue, 9 May 2017 14:10:24 -0400 Subject: [PATCH 05/10] Small tweak to rule. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ae43e58d20ad..0ce1fb6115bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -633,7 +633,7 @@ object CollapseWindow extends Rule[LogicalPlan] { object TransposeWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if ps2.containsSlice(ps1) => + if ps1.length < ps2.length && ps2.containsSlice(ps1) => Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) } } From 9a28535983b35b0c71baf2c5c70d2d00a2f813cc Mon Sep 17 00:00:00 2001 From: ptkool Date: Sat, 20 May 2017 08:37:26 -0400 Subject: [PATCH 06/10] Changes based on feedback. --- .../sql/catalyst/optimizer/Optimizer.scala | 10 ++++- .../optimizer/TransposeWindowSuite.scala | 44 ++++++++----------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0ce1fb6115bf..19930dc6920d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -627,13 +627,19 @@ object CollapseWindow extends Rule[LogicalPlan] { /** * Transpose Adjacent Window Expressions. - * - If the partition spec of the parent Window expression is a subset of the partition spec + * - If the partition spec of the parent Window expression is compatible with the partition spec * of the child window expression, transpose them. */ object TransposeWindow extends Rule[LogicalPlan] { + private def compatibleParititions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = { + ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall { + case (l, r) => l.semanticEquals(r) + }) + } + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if ps1.length < ps2.length && ps2.containsSlice(ps1) => + if w1.references.intersect(w2.windowOutputSet).isEmpty && compatibleParititions(ps1, ps2) => Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 3a7b4afe6eac..1872302fb31e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -42,32 +42,12 @@ class TransposeWindowSuite extends PlanTest { val partitionSpec1 = Seq(a) val partitionSpec2 = Seq(a, b) val partitionSpec3 = Seq(d) + val partitionSpec4 = Seq(b, a, d) val orderSpec1 = Seq(d.asc) val orderSpec2 = Seq(d.desc) - test("flip two adjacent windows with compatible partitions in multiple selects") { - val wexpr1 = windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)) - val wexpr2 = windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)) - - val query = testRelation - .select('a, 'b, 'c, wexpr1.as('sum_a_2)) - .select('a, 'b, 'c, 'sum_a_2, wexpr2.as('sum_a_1)) - - val optimized = Optimize.execute(query.analyze) - - val query2 = testRelation - .select('a, 'b, 'c) - .select('a, 'b, 'c, wexpr2.as('sum_a_1)) - .select('a, 'b, 'c, wexpr1.as('sum_a_2), 'sum_a_1) - .select('a, 'b, 'c, 'sum_a_2, 'sum_a_1) - - val correctAnswer = Optimize.execute(query2.analyze) - - comparePlans(optimized, correctAnswer) - } - - test("flip two adjacent windows with compatible partitions") { + test("transpose two adjacent windows with compatible partitions") { val query = testRelation .window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2) .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1) @@ -83,19 +63,31 @@ class TransposeWindowSuite extends PlanTest { comparePlans(optimized, correctAnswer.analyze) } - test("don't flip two adjacent windows with incompatible partitions") { + test("transpose two adjacent windows with differently ordered compatible partitions") { val query = testRelation - .window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty) - .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty) + .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, Seq.empty) + .window(Seq(sum(c).as('sum_a_1)), partitionSpec2, Seq.empty) val analyzed = query.analyze val optimized = Optimize.execute(analyzed) val correctAnswer = testRelation + .window(Seq(sum(c).as('sum_a_1)), partitionSpec2, Seq.empty) + .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, Seq.empty) + .select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1) + + comparePlans(optimized, correctAnswer.analyze) + } + + test("don't transpose two adjacent windows with incompatible partitions") { + val query = testRelation .window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty) .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty) - comparePlans(optimized, correctAnswer.analyze) + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + comparePlans(optimized, analyzed) } } From 76f1721bcd29159256a5c921490234150687bb69 Mon Sep 17 00:00:00 2001 From: ptkool Date: Wed, 12 Jul 2017 11:16:39 -0400 Subject: [PATCH 07/10] New test case. --- .../sql/catalyst/optimizer/Optimizer.scala | 5 +++- .../optimizer/TransposeWindowSuite.scala | 25 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 19930dc6920d..369f0982aa5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -639,7 +639,10 @@ object TransposeWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if w1.references.intersect(w2.windowOutputSet).isEmpty && compatibleParititions(ps1, ps2) => + if w1.references.intersect(w2.windowOutputSet).isEmpty && + w1.expressions.forall(_.deterministic) && + w2.expressions.forall(_.deterministic) && + compatibleParititions(ps1, ps2) => Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 1872302fb31e..58b3d1c98f3c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{RowFrame, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, UnspecifiedFrame} +import org.apache.spark.sql.catalyst.expressions.Rand import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor - class TransposeWindowSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = @@ -90,4 +89,26 @@ class TransposeWindowSuite extends PlanTest { comparePlans(optimized, analyzed) } + test("don't transpose two adjacent windows with intersection of partition and output set") { + val query = testRelation + .window(Seq(('a + 'b).as('e), sum(c).as('sum_a_2)), partitionSpec3, Seq.empty) + .window(Seq(sum(c).as('sum_a_1)), Seq(a, 'e), Seq.empty) + + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + comparePlans(optimized, analyzed) + } + + test("don't transpose two adjacent windows with non-deterministic expressions") { + val query = testRelation + .window(Seq(Rand(0).as('e), sum(c).as('sum_a_2)), partitionSpec3, Seq.empty) + .window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty) + + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + comparePlans(optimized, analyzed) + } + } From f840c69231025b53d057ec7e1032750abfa856b8 Mon Sep 17 00:00:00 2001 From: ptkool Date: Sun, 29 Oct 2017 08:34:27 -0400 Subject: [PATCH 08/10] Fix scalastyle errors --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 369f0982aa5d..c61b2b14d18e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -620,8 +620,8 @@ object CollapseRepartition extends Rule[LogicalPlan] { object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty => - w1.copy(windowExpressions = we2 ++ we1, child = grandChild) + if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty => + w1.copy(windowExpressions = we2 ++ we1, child = grandChild) } } @@ -639,11 +639,11 @@ object TransposeWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if w1.references.intersect(w2.windowOutputSet).isEmpty && - w1.expressions.forall(_.deterministic) && - w2.expressions.forall(_.deterministic) && - compatibleParititions(ps1, ps2) => - Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) + if w1.references.intersect(w2.windowOutputSet).isEmpty && + w1.expressions.forall(_.deterministic) && + w2.expressions.forall(_.deterministic) && + compatibleParititions(ps1, ps2) => + Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) } } From e9f6928bb60e7c4de25324e5572f105a30d16cd5 Mon Sep 17 00:00:00 2001 From: ptkool Date: Sun, 29 Oct 2017 08:52:23 -0400 Subject: [PATCH 09/10] Fix another scalastyle error --- .../org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 573551390734..6d94fc6f1fae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -468,7 +468,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { spark.read.schema(sampleSchema).json(input.toDS()).select(c0, c1).foreach { _ => () } } } - + test("window functions in multiple selects") { val df = Seq( ("S1", "P1", 100), From 432883117b8d067a821422d68fcfc3a03ca36527 Mon Sep 17 00:00:00 2001 From: ptkool Date: Fri, 7 Sep 2018 12:09:30 -0400 Subject: [PATCH 10/10] Remove style changes --- .../sql/catalyst/optimizer/Optimizer.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7ef14a17acef..b432ce24e1ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -726,10 +726,10 @@ object CollapseRepartition extends Rule[LogicalPlan] { object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty && - // This assumes Window contains the same type of window expressions. This is ensured - // by ExtractWindowFunctions. - WindowFunctionType.functionType(we1.head) == WindowFunctionType.functionType(we2.head) => + if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty && + // This assumes Window contains the same type of window expressions. This is ensured + // by ExtractWindowFunctions. + WindowFunctionType.functionType(we1.head) == WindowFunctionType.functionType(we2.head) => w1.copy(windowExpressions = we2 ++ we1, child = grandChild) } } @@ -748,11 +748,11 @@ object TransposeWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if w1.references.intersect(w2.windowOutputSet).isEmpty && - w1.expressions.forall(_.deterministic) && - w2.expressions.forall(_.deterministic) && - compatibleParititions(ps1, ps2) => - Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) + if w1.references.intersect(w2.windowOutputSet).isEmpty && + w1.expressions.forall(_.deterministic) && + w2.expressions.forall(_.deterministic) && + compatibleParititions(ps1, ps2) => + Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))) } }