diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d23d43bef76e..40b72b296704 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -115,6 +115,8 @@ class Analyzer( } } + override def verifyOnceStrategyIdempotence: Boolean = true + override def execute(plan: LogicalPlan): LogicalPlan = { AnalysisContext.reset() try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index dccb44ddebfa..409bd0555e48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -65,6 +65,15 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { */ protected def isPlanIntegral(plan: TreeType): Boolean = true + /** Whether to verify batches with once strategy stabilize after one run. */ + protected def verifyOnceStrategyIdempotence: Boolean = false + + private val knownUnstableBatches = Seq( + ("UDF", "org.apache.spark.ml.feature.StringIndexerSuite.transform"), + ("View", "org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite." + + "SPARK-22431: view with nested type") + ) + /** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. @@ -78,6 +87,15 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { var iteration = 1 var lastPlan = curPlan var continue = true + // Verify that once-strategy batches stabilize after one run when testing. + val maxIterations = + if (batch.strategy.maxIterations == 1 + && verifyOnceStrategyIdempotence + && !knownUnstableBatches.exists(_._1 == batch.name)) { + 2 + } else { + batch.strategy.maxIterations + } // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { @@ -108,11 +126,19 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { result } - iteration += 1 - if (iteration > batch.strategy.maxIterations) { - // Only log if this is a rule that is supposed to run more than once. - if (iteration != 2) { - val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + + // Stop applying this batch if: + // 1) the plan hasn't changed over the last iteration; or + // 2) max number of iterations has been reached. + if (curPlan.fastEquals(lastPlan)) { + logTrace( + s"Fixed point reached for batch ${batch.name} after ${iteration} iterations.") + continue = false + } else if (iteration >= maxIterations) { + // Only log if the batch has run more than once. + if (iteration > 1) { + val message = s"Plan did not stabilize after max iterations " + + s"(${batch.strategy.maxIterations}) reached for batch ${batch.name}." if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { @@ -122,11 +148,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { continue = false } - if (curPlan.fastEquals(lastPlan)) { - logTrace( - s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") - continue = false - } + iteration += 1 lastPlan = curPlan } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index a67f54b263cc..7172afcf1c9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -54,7 +54,9 @@ class RuleExecutorSuite extends SparkFunSuite { val message = intercept[TreeNodeException[LogicalPlan]] { ToFixedPoint.execute(Literal(100)) }.getMessage - assert(message.contains("Max iterations (10) reached for batch fixedPoint")) + assert( + message.contains( + "Plan did not stabilize after max iterations (10) reached for batch fixedPoint")) } test("structural integrity checker") { @@ -73,4 +75,17 @@ class RuleExecutorSuite extends SparkFunSuite { }.getMessage assert(message.contains("the structural integrity of the plan is broken")) } + + test("only once but did not stabilize") { + object ApplyOnce extends RuleExecutor[Expression] { + override def verifyOnceStrategyIdempotence: Boolean = true + val batches = Batch("once", Once, DecrementLiterals) :: Nil + } + + val message = intercept[TreeNodeException[LogicalPlan]] { + ApplyOnce.execute(Literal(10)) + }.getMessage + assert( + message.contains("Plan did not stabilize after max iterations (1) reached for batch once")) + } }