From f06e8395f9b89cbe911a0404afe237495bb1e86d Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Fri, 25 Jan 2019 16:24:28 -0800 Subject: [PATCH 1/4] verify plan integrity: special expressions AggregateExpression, WindowExpression and Generator should only be hosted in corresponding operators --- .../sql/catalyst/optimizer/Optimizer.scala | 34 ++++++++++++++++++- ...mizerStructuralIntegrityCheckerSuite.scala | 22 ++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 20f1221be425..7878ab4223de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -41,7 +41,39 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { - !Utils.isTesting || plan.resolved + !Utils.isTesting || (plan.resolved && checkSpecialExpressionIntegrity(plan)) + } + + /** + * Check if all operators in this plan hold structural integrity with regards to hosting special + * expressions. + * Returns true when all operators are integral. + */ + private def checkSpecialExpressionIntegrity(plan: LogicalPlan): Boolean = { + plan.collectFirst { + case p if specialExpressionInUnsupportedOperator(p) => p + }.isEmpty + } + + /** + * Check if there's any expression in this query plan operator that is + * - A WindowExpression but the plan is not Window + * - An AggregateExpresion but the plan is not Aggregate or Window + * - A Generator but the plan is not Generate + * Returns true when this operator breaks structural integrity with one of the cases above. + */ + private def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Boolean = { + val exprs = plan.expressions + exprs.flatMap { root => + root.collectFirst { + case e: WindowExpression + if !plan.isInstanceOf[Window] => e + case e: AggregateExpression + if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => e + case e: Generator + if !plan.isInstanceOf[Generate] => e + } + }.nonEmpty } protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index a22a81e9844d..b8e34e586ea4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, LogicalPlan, OneRowRelation, Project} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf @@ -35,6 +36,8 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { case Project(projectList, child) => val newAttr = UnresolvedAttribute("unresolvedAttr") Project(projectList ++ Seq(newAttr), child) + case agg @ Aggregate(Nil, aggregateExpressions, child) => + Project(aggregateExpressions, child) } } @@ -47,7 +50,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { override def defaultBatches: Seq[Batch] = Seq(newBatch) ++ super.defaultBatches } - test("check for invalid plan after execution of rule") { + test("check for invalid plan after execution of rule - unresolved attribute") { val analyzed = Project(Alias(Literal(10), "attr")() :: Nil, OneRowRelation()).analyze assert(analyzed.resolved) val message = intercept[TreeNodeException[LogicalPlan]] { @@ -57,4 +60,17 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { assert(message.contains(s"After applying rule $ruleName in batch OptimizeRuleBreakSI")) assert(message.contains("the structural integrity of the plan is broken")) } + + test("check for invalid plan after execution of rule - special expression in wrong operator") { + val analyzed = + Aggregate(Nil, Seq[NamedExpression](max('id) as 'm), + LocalRelation('id.long)).analyze + assert(analyzed.resolved) + val message = intercept[TreeNodeException[LogicalPlan]] { + Optimize.execute(analyzed) + }.getMessage + val ruleName = OptimizeRuleBreakSI.ruleName + assert(message.contains(s"After applying rule $ruleName in batch OptimizeRuleBreakSI")) + assert(message.contains("the structural integrity of the plan is broken")) + } } From c7ccc42c18d46d638c42c4fe13f8494393908f2f Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Sat, 26 Jan 2019 00:59:31 -0800 Subject: [PATCH 2/4] Address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 19 ++++++++++--------- .../sql/catalyst/rules/RuleExecutor.scala | 7 +++++++ ...mizerStructuralIntegrityCheckerSuite.scala | 18 ++++++++++++++++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7878ab4223de..38a051c15476 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -38,8 +38,10 @@ import org.apache.spark.util.Utils abstract class Optimizer(sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { - // Check for structural integrity of the plan in test mode. Currently we only check if a plan is - // still resolved after the execution of each rule. + // Check for structural integrity of the plan in test mode. + // Currently we check after the execution of each rule if a plan: + // - is still resolved + // - only host special expressions in supported operators override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { !Utils.isTesting || (plan.resolved && checkSpecialExpressionIntegrity(plan)) } @@ -50,9 +52,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) * Returns true when all operators are integral. */ private def checkSpecialExpressionIntegrity(plan: LogicalPlan): Boolean = { - plan.collectFirst { - case p if specialExpressionInUnsupportedOperator(p) => p - }.isEmpty + plan.find(specialExpressionInUnsupportedOperator).isEmpty } /** @@ -65,13 +65,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) private def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Boolean = { val exprs = plan.expressions exprs.flatMap { root => - root.collectFirst { + root.find { case e: WindowExpression - if !plan.isInstanceOf[Window] => e + if !plan.isInstanceOf[Window] => true case e: AggregateExpression - if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => e + if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => true case e: Generator - if !plan.isInstanceOf[Generate] => e + if !plan.isInstanceOf[Generate] => true + case _ => false } }.nonEmpty } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index cf6ff4f98639..088f1fecb645 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -88,6 +88,13 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val planChangeLogger = new PlanChangeLogger() val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get + // Run the structural integrity checker against the initial input + if (!isPlanIntegral(plan)) { + val message = "The structural integrity of the input plan is broken in " + + s"${this.getClass.getName.stripSuffix("$")}." + throw new TreeNodeException(plan, message, null) + } + batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index b8e34e586ea4..20e1383d2595 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -66,11 +66,29 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { Aggregate(Nil, Seq[NamedExpression](max('id) as 'm), LocalRelation('id.long)).analyze assert(analyzed.resolved) + + // Should fail verification with the OptimizeRuleBreakSI rule val message = intercept[TreeNodeException[LogicalPlan]] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName assert(message.contains(s"After applying rule $ruleName in batch OptimizeRuleBreakSI")) assert(message.contains("the structural integrity of the plan is broken")) + + // Should not fail verification with the regular optimizer + SimpleTestOptimizer.execute(analyzed) + } + + test("check for invalid plan before execution of any rule") { + val analyzed = + Aggregate(Nil, Seq[NamedExpression](max('id) as 'm), + LocalRelation('id.long)).analyze + val invalidPlan = OptimizeRuleBreakSI.apply(analyzed) + + // Should fail verification right at the beginning + val message = intercept[TreeNodeException[LogicalPlan]] { + Optimize.execute(invalidPlan) + }.getMessage + assert(message.contains("The structural integrity of the input plan is broken")) } } From 4c34e98973b63323013f65d652fd7c14ab00c0fb Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Sat, 26 Jan 2019 12:57:37 -0800 Subject: [PATCH 3/4] Disable failing test case in SubquerySuite. It should be re-enabled after fixing SPARK-26741 --- .../src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 48c167660913..8ade43300987 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1186,7 +1186,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") { + ignore("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") { withTempView("t1", "t2", "t3") { Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1") Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2") From c1d38dd2fc05237131baaac443a493593c2fa863 Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Sat, 26 Jan 2019 17:05:08 -0800 Subject: [PATCH 4/4] Address Xiao's comment and fix test --- ...mizerStructuralIntegrityCheckerSuite.scala | 1 + .../catalyst/trees/RuleExecutorSuite.scala | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 20e1383d2595..5e0d2041fac5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -37,6 +37,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { val newAttr = UnresolvedAttribute("unresolvedAttr") Project(projectList ++ Seq(newAttr), child) case agg @ Aggregate(Nil, aggregateExpressions, child) => + // Project cannot host AggregateExpression Project(aggregateExpressions, child) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index a67f54b263cc..ab5d722975ef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -57,7 +57,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(message.contains("Max iterations (10) reached for batch fixedPoint")) } - test("structural integrity checker") { + test("structural integrity checker - verify initial input") { object WithSIChecker extends RuleExecutor[Expression] { override protected def isPlanIntegral(expr: Expression): Boolean = expr match { case IntegerLiteral(_) => true @@ -69,8 +69,26 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSIChecker.execute(Literal(10)) === Literal(9)) val message = intercept[TreeNodeException[LogicalPlan]] { + // The input is already invalid as determined by WithSIChecker.isPlanIntegral WithSIChecker.execute(Literal(10.1)) }.getMessage + assert(message.contains("The structural integrity of the input plan is broken")) + } + + test("structural integrity checker - verify rule execution result") { + object WithSICheckerForPositiveLiteral extends RuleExecutor[Expression] { + override protected def isPlanIntegral(expr: Expression): Boolean = expr match { + case IntegerLiteral(i) if i > 0 => true + case _ => false + } + val batches = Batch("once", Once, DecrementLiterals) :: Nil + } + + assert(WithSICheckerForPositiveLiteral.execute(Literal(2)) === Literal(1)) + + val message = intercept[TreeNodeException[LogicalPlan]] { + WithSICheckerForPositiveLiteral.execute(Literal(1)) + }.getMessage assert(message.contains("the structural integrity of the plan is broken")) } }