From c57f4e8eb4c1a3676f41155b1c72d29c18adc335 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 3 Jan 2021 18:04:14 +0200 Subject: [PATCH 1/6] Eliminate distinct from more aggregates --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +- .../optimizer/EliminateDistinctSuite.scala | 43 ++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 47260cfb59bb1..889c208eba47d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -351,7 +351,9 @@ object EliminateDistinct extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { case ae: AggregateExpression if ae.isDistinct => ae.aggregateFunction match { - case _: Max | _: Min => ae.copy(isDistinct = false) + case _: Max | _: Min | _: BitAndAgg | _: BitOrAgg => ae.copy(isDistinct = false) + case _: First | _: Last | _: HyperLogLogPlusPlus => ae.copy(isDistinct = false) + case _: CollectSet => ae.copy(isDistinct = false) case _ => ae } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 51c751923e414..8ae2c6a4dd28d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -32,25 +34,26 @@ class EliminateDistinctSuite extends PlanTest { val testRelation = LocalRelation('a.int) - test("Eliminate Distinct in Max") { - val query = testRelation - .select(maxDistinct('a).as('result)) - .analyze - val answer = testRelation - .select(max('a).as('result)) - .analyze - assert(query != answer) - comparePlans(Optimize.execute(query), answer) - } - - test("Eliminate Distinct in Min") { - val query = testRelation - .select(minDistinct('a).as('result)) - .analyze - val answer = testRelation - .select(min('a).as('result)) - .analyze - assert(query != answer) - comparePlans(Optimize.execute(query), answer) + Seq( + ("max", Max(_)), + ("min", Min(_)), + ("approx_count_distinct", HyperLogLogPlusPlus(_: Expression)), + ("first", First(_, ignoreNulls = true)), + ("last", Last(_, ignoreNulls = true)), + ("bit_and", BitAndAgg(_)), + ("bit_or", BitOrAgg(_)), + ("collect_set", CollectSet(_: Expression)) + ).foreach { + case (name, agg) => + test(s"Eliminate Distinct in $name") { + val query = testRelation + .select(agg('a).toAggregateExpression(isDistinct = true).as('result)) + .analyze + val answer = testRelation + .select(agg('a).toAggregateExpression(isDistinct = false).as('result)) + .analyze + assert(query != answer) + comparePlans(Optimize.execute(query), answer) + } } } From 0057d57f0e0c2d7951bf785eabdd9066e26b0d6b Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 3 Jan 2021 18:04:14 +0200 Subject: [PATCH 2/6] Eliminate distinct from more aggregates --- .../sql/catalyst/optimizer/Optimizer.scala | 21 ++++++--- .../optimizer/EliminateDistinctSuite.scala | 43 ++++++++++--------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 47260cfb59bb1..709a7da61f8c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -349,11 +349,22 @@ abstract class Optimizer(catalogManager: CatalogManager) */ object EliminateDistinct extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { - case ae: AggregateExpression if ae.isDistinct => - ae.aggregateFunction match { - case _: Max | _: Min => ae.copy(isDistinct = false) - case _ => ae - } + case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) => + ae.copy(isDistinct = false) + } + + private def isDuplicateAgnostic(af: AggregateFunction): Boolean = { + af match { + case _: Max => true + case _: Min => true + case _: BitAndAgg => true + case _: BitOrAgg => true + case _: First => true + case _: Last => true + case _: HyperLogLogPlusPlus => true + case _: CollectSet => true + case _ => false + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 51c751923e414..8ae2c6a4dd28d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -32,25 +34,26 @@ class EliminateDistinctSuite extends PlanTest { val testRelation = LocalRelation('a.int) - test("Eliminate Distinct in Max") { - val query = testRelation - .select(maxDistinct('a).as('result)) - .analyze - val answer = testRelation - .select(max('a).as('result)) - .analyze - assert(query != answer) - comparePlans(Optimize.execute(query), answer) - } - - test("Eliminate Distinct in Min") { - val query = testRelation - .select(minDistinct('a).as('result)) - .analyze - val answer = testRelation - .select(min('a).as('result)) - .analyze - assert(query != answer) - comparePlans(Optimize.execute(query), answer) + Seq( + ("max", Max(_)), + ("min", Min(_)), + ("approx_count_distinct", HyperLogLogPlusPlus(_: Expression)), + ("first", First(_, ignoreNulls = true)), + ("last", Last(_, ignoreNulls = true)), + ("bit_and", BitAndAgg(_)), + ("bit_or", BitOrAgg(_)), + ("collect_set", CollectSet(_: Expression)) + ).foreach { + case (name, agg) => + test(s"Eliminate Distinct in $name") { + val query = testRelation + .select(agg('a).toAggregateExpression(isDistinct = true).as('result)) + .analyze + val answer = testRelation + .select(agg('a).toAggregateExpression(isDistinct = false).as('result)) + .analyze + assert(query != answer) + comparePlans(Optimize.execute(query), answer) + } } } From d1cd01d1a9319acbc15cf7095663ccef4544365d Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 3 Jan 2021 21:01:30 +0200 Subject: [PATCH 3/6] Eliminate distinct from more aggregates --- .../sql/catalyst/optimizer/Optimizer.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 709a7da61f8c3..bc0a7223b7779 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -353,18 +353,16 @@ object EliminateDistinct extends Rule[LogicalPlan] { ae.copy(isDistinct = false) } - private def isDuplicateAgnostic(af: AggregateFunction): Boolean = { - af match { - case _: Max => true - case _: Min => true - case _: BitAndAgg => true - case _: BitOrAgg => true - case _: First => true - case _: Last => true - case _: HyperLogLogPlusPlus => true - case _: CollectSet => true - case _ => false - } + private def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { + case _: Max => true + case _: Min => true + case _: BitAndAgg => true + case _: BitOrAgg => true + case _: First => true + case _: Last => true + case _: HyperLogLogPlusPlus => true + case _: CollectSet => true + case _ => false } } From 5f9df99f950875755e4f20557d08fab392fa8097 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 9 Jan 2021 14:20:19 +0200 Subject: [PATCH 4/6] Remove HLL++ --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 - .../spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bc0a7223b7779..aa4e208f5c0cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -360,7 +360,6 @@ object EliminateDistinct extends Rule[LogicalPlan] { case _: BitOrAgg => true case _: First => true case _: Last => true - case _: HyperLogLogPlusPlus => true case _: CollectSet => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 8ae2c6a4dd28d..99e00fc8717ff 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -37,7 +37,6 @@ class EliminateDistinctSuite extends PlanTest { Seq( ("max", Max(_)), ("min", Min(_)), - ("approx_count_distinct", HyperLogLogPlusPlus(_: Expression)), ("first", First(_, ignoreNulls = true)), ("last", Last(_, ignoreNulls = true)), ("bit_and", BitAndAgg(_)), From 4b9d04b6b94e38e1b601e15dc66fbd56ed7c8f87 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 9 Jan 2021 15:25:18 +0200 Subject: [PATCH 5/6] Refactor test --- .../optimizer/EliminateDistinctSuite.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 99e00fc8717ff..dd5ba9e10e28f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -35,24 +35,25 @@ class EliminateDistinctSuite extends PlanTest { val testRelation = LocalRelation('a.int) Seq( - ("max", Max(_)), - ("min", Min(_)), - ("first", First(_, ignoreNulls = true)), - ("last", Last(_, ignoreNulls = true)), - ("bit_and", BitAndAgg(_)), - ("bit_or", BitOrAgg(_)), - ("collect_set", CollectSet(_: Expression)) + Max(_), + Min(_), + First(_, ignoreNulls = true), + Last(_, ignoreNulls = true), + BitAndAgg(_), + BitOrAgg(_), + CollectSet(_: Expression) ).foreach { - case (name, agg) => - test(s"Eliminate Distinct in $name") { + aggBuilder => + val agg = aggBuilder('a) + test(s"Eliminate Distinct in ${agg.prettyName}") { val query = testRelation - .select(agg('a).toAggregateExpression(isDistinct = true).as('result)) + .select(agg.toAggregateExpression(isDistinct = true).as('result)) .analyze val answer = testRelation - .select(agg('a).toAggregateExpression(isDistinct = false).as('result)) + .select(agg.toAggregateExpression(isDistinct = false).as('result)) .analyze assert(query != answer) comparePlans(Optimize.execute(query), answer) - } + } } } From f95fb8d89d4993c2470a5d4bd8408a9dd4147bcb Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Tue, 26 Jan 2021 19:04:10 +0200 Subject: [PATCH 6/6] Remove first and last --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 -- .../spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala | 2 -- 2 files changed, 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa4e208f5c0cf..4226a1b0aea73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -358,8 +358,6 @@ object EliminateDistinct extends Rule[LogicalPlan] { case _: Min => true case _: BitAndAgg => true case _: BitOrAgg => true - case _: First => true - case _: Last => true case _: CollectSet => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index dd5ba9e10e28f..0848d5609ff02 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -37,8 +37,6 @@ class EliminateDistinctSuite extends PlanTest { Seq( Max(_), Min(_), - First(_, ignoreNulls = true), - Last(_, ignoreNulls = true), BitAndAgg(_), BitOrAgg(_), CollectSet(_: Expression)