From 80af01add6c0169c7cad0286afc748d845cd1327 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Tue, 22 Aug 2017 15:47:27 +0800 Subject: [PATCH 01/10] The getAliasedConstraints function will take a long time when expression is greater than 100 --- .../sql/catalyst/expressions/ExpressionSet.scala | 6 ++++++ .../sql/catalyst/plans/logical/LogicalPlan.scala | 5 +++-- .../spark/sql/catalyst/plans/LogicalPlanSuite.scala | 13 ++++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index ede0b1654bbd6..d235ee7ae4cda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -59,6 +59,12 @@ class ExpressionSet protected( } } + def addMultiExpression(elems: Set[Expression]): ExpressionSet = { + val newSet = new ExpressionSet(baseSet.clone(), originals.clone()) + elemSet.foreach(newSet.add(_)) + newSet + } + override def contains(elem: Expression): Boolean = baseSet.contains(elem.canonicalized) override def +(elem: Expression): ExpressionSet = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 9b440cd99f994..d97b3c7bc3bfe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -289,14 +289,15 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var allConstraints = child.constraints.asInstanceOf[Set[Expression]] + var allConstraints = child.constraints projectList.foreach { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. - allConstraints ++= allConstraints.map(_ transform { + val replacedElement = allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) + allConstraints = allConstraints.addMultiExpression(replacedElement) allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index cc86f1f6e2f48..234cca958eaa5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.IntegerType @@ -89,4 +90,14 @@ class LogicalPlanSuite extends SparkFunSuite { assert(TestBinaryRelation(relation, incrementalRelation).isStreaming === true) assert(TestBinaryRelation(incrementalRelation, incrementalRelation).isStreaming) } + + test("getAliasedConstraints") { + val expressionNum = 100 + val aggExpression = (1 to expressionNum).map(i => Alias(Count(Literal(1)), s"cnt$i")()) + val aggPlan = Aggregate(Nil, aggExpression, LocalRelation()) + + val expressions = aggPlan.validConstraints + // The size of Aliased expression is n * (n - 1) / 2 + n + assert( expressions.size === expressionNum * (expressionNum - 1) / 2 + expressionNum) + } } From 7444a1c5c7e8939b8d3695202d39ad0e8561b940 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Tue, 22 Aug 2017 17:07:49 +0800 Subject: [PATCH 02/10] Modify name --- .../apache/spark/sql/catalyst/expressions/ExpressionSet.scala | 4 ++-- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index d235ee7ae4cda..40e85e7f7f63e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -59,9 +59,9 @@ class ExpressionSet protected( } } - def addMultiExpression(elems: Set[Expression]): ExpressionSet = { + def addMultiExpressions(elems: Set[Expression]): ExpressionSet = { val newSet = new ExpressionSet(baseSet.clone(), originals.clone()) - elemSet.foreach(newSet.add(_)) + elems.foreach(newSet.add(_)) newSet } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index d97b3c7bc3bfe..28f59f907859e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -297,7 +297,7 @@ abstract class UnaryNode extends LogicalPlan { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) - allConstraints = allConstraints.addMultiExpression(replacedElement) + allConstraints = allConstraints.addMultiExpressions(replacedElement) allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } From 1f5e0a30f2d11034c572b11764a59b354ca079f8 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Tue, 22 Aug 2017 17:09:15 +0800 Subject: [PATCH 03/10] Modify name --- .../apache/spark/sql/catalyst/expressions/ExpressionSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index 40e85e7f7f63e..f592a37d6e4cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -61,7 +61,7 @@ class ExpressionSet protected( def addMultiExpressions(elems: Set[Expression]): ExpressionSet = { val newSet = new ExpressionSet(baseSet.clone(), originals.clone()) - elems.foreach(newSet.add(_)) + elems.foreach(newSet.add) newSet } From 88185ffdff489fb3eda01a7c27762f5d97692f58 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Tue, 22 Aug 2017 17:16:31 +0800 Subject: [PATCH 04/10] Modify name --- .../org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index 234cca958eaa5..53a9ffa96fa04 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.IntegerType From 97803ce8af44ef72e2519d34e0da5a715fd4274b Mon Sep 17 00:00:00 2001 From: 10129659 Date: Wed, 23 Aug 2017 16:58:51 +0800 Subject: [PATCH 05/10] Override plus plus --- .../sql/catalyst/expressions/ExpressionSet.scala | 14 +++++++------- .../sql/catalyst/plans/logical/LogicalPlan.scala | 3 +-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index f592a37d6e4cf..9da848e349988 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions -import scala.collection.mutable +import scala.collection.{GenTraversableOnce, mutable} import scala.collection.mutable.ArrayBuffer object ExpressionSet { @@ -59,12 +59,6 @@ class ExpressionSet protected( } } - def addMultiExpressions(elems: Set[Expression]): ExpressionSet = { - val newSet = new ExpressionSet(baseSet.clone(), originals.clone()) - elems.foreach(newSet.add) - newSet - } - override def contains(elem: Expression): Boolean = baseSet.contains(elem.canonicalized) override def +(elem: Expression): ExpressionSet = { @@ -73,6 +67,12 @@ class ExpressionSet protected( newSet } + override def ++(elems: GenTraversableOnce[Expression]): ExpressionSet = { + val newSet = new ExpressionSet(baseSet.clone(), originals.clone()) + elems.foreach(newSet.add) + newSet + } + override def -(elem: Expression): ExpressionSet = { val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 28f59f907859e..edebcda0680b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,11 +293,10 @@ abstract class UnaryNode extends LogicalPlan { projectList.foreach { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. - val replacedElement = allConstraints.map(_ transform { + allConstraints ++= allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) - allConstraints = allConstraints.addMultiExpressions(replacedElement) allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } From 835835133a4dd79136b7d02c816d0ea45184784a Mon Sep 17 00:00:00 2001 From: 10129659 Date: Wed, 23 Aug 2017 17:01:44 +0800 Subject: [PATCH 06/10] Override plus plus --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index edebcda0680b3..9b440cd99f994 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -289,7 +289,7 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var allConstraints = child.constraints + var allConstraints = child.constraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. From 6ee2ffda5c5df78c4923b05f5d08806e4d9c195d Mon Sep 17 00:00:00 2001 From: 10129659 Date: Wed, 23 Aug 2017 17:19:23 +0800 Subject: [PATCH 07/10] Add expression set test --- .../sql/catalyst/expressions/ExpressionSetSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala index d617ad540d5ff..9dc3eee8a3826 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala @@ -210,4 +210,13 @@ class ExpressionSetSuite extends SparkFunSuite { assert((initialSet - (aLower + 1)).size == 0) } + + test("add multiple elements to set") { + val initialSet = ExpressionSet(aUpper + 1 :: Nil) + val setToAddWithSameExpression = ExpressionSet(aUpper + 1 :: aUpper + 2 :: Nil) + val setToAdd = ExpressionSet(aUpper + 2 :: aUpper + 3 :: Nil) + + assert((initialSet ++ setToAddWithSameExpression).size == 2) + assert((initialSet ++ setToAdd).size == 3) + } } From 57e8a3d66d5dc87c7627e0f370fe907233fa1864 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Wed, 23 Aug 2017 17:31:03 +0800 Subject: [PATCH 08/10] Remove getAliasedConstraints test --- .../spark/sql/catalyst/plans/LogicalPlanSuite.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index 53a9ffa96fa04..cc86f1f6e2f48 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal} -import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.IntegerType @@ -90,14 +89,4 @@ class LogicalPlanSuite extends SparkFunSuite { assert(TestBinaryRelation(relation, incrementalRelation).isStreaming === true) assert(TestBinaryRelation(incrementalRelation, incrementalRelation).isStreaming) } - - test("getAliasedConstraints") { - val expressionNum = 100 - val aggExpression = (1 to expressionNum).map(i => Alias(Count(Literal(1)), s"cnt$i")()) - val aggPlan = Aggregate(Nil, aggExpression, LocalRelation()) - - val expressions = aggPlan.validConstraints - // The size of Aliased expression is n * (n - 1) / 2 + n - assert( expressions.size === expressionNum * (expressionNum - 1) / 2 + expressionNum) - } } From fec05d6249e7636fad9a9f3bb025ba57b3e8dd30 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 24 Aug 2017 08:44:13 +0800 Subject: [PATCH 09/10] Change import order --- .../spark/sql/catalyst/expressions/ExpressionSetSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala index 9dc3eee8a3826..a1000a0e80799 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala @@ -214,9 +214,9 @@ class ExpressionSetSuite extends SparkFunSuite { test("add multiple elements to set") { val initialSet = ExpressionSet(aUpper + 1 :: Nil) val setToAddWithSameExpression = ExpressionSet(aUpper + 1 :: aUpper + 2 :: Nil) - val setToAdd = ExpressionSet(aUpper + 2 :: aUpper + 3 :: Nil) + val setToAddWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper + 4 :: Nil) assert((initialSet ++ setToAddWithSameExpression).size == 2) - assert((initialSet ++ setToAdd).size == 3) + assert((initialSet ++ setToAddWithOutSameExpression).size == 3) } } From 07628402eeed958c45905974c82b06211f1bc934 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 24 Aug 2017 08:46:03 +0800 Subject: [PATCH 10/10] Change import order --- .../apache/spark/sql/catalyst/expressions/ExpressionSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index 9da848e349988..305ac90e245b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions -import scala.collection.{GenTraversableOnce, mutable} +import scala.collection.{mutable, GenTraversableOnce} import scala.collection.mutable.ArrayBuffer object ExpressionSet {