diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 51a3facf40835..36b167ba9044f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -608,7 +608,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } checkCollectedMetrics(plan) - checkOffsetOperator(plan) extendedCheckRules.foreach(_(plan)) plan.foreachUp { case o if !o.resolved => @@ -851,30 +850,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { check(plan) } - /** - * Validate whether the [[Offset]] is valid. - */ - private def checkOffsetOperator(plan: LogicalPlan): Unit = { - plan.foreachUp { - case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] - && o.children.exists(_.isInstanceOf[Offset]) => - failAnalysis( - s""" - |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET - |clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) - case _ => - } - plan match { - case Offset(offsetExpr, _) => - checkLimitLikeClause("offset", offsetExpr) - failAnalysis( - s""" - |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET - |clause is found to be the outermost node.""".stripMargin.replace("\n", " ")) - case _ => - } - } - /** * Validates to make sure the outer references appearing inside the subquery * are allowed. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2e4c5973cd149..5739812a653f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -281,6 +281,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // However, because we also use the analyzer to canonicalized queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. private val rules = Seq( + CheckOffsetOperator, EliminateResolvedHint, EliminateSubqueryAliases, EliminateView, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index ef9c4b9af40d3..0ff6ed37d1008 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ} import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -138,3 +139,25 @@ object SpecialDatetimeValues extends Rule[LogicalPlan] { } } } + +/** + * Validate whether the [[Offset]] is valid. Dataset API eagerly analyzes the query plan, so a + * query plan may contains invalid [[Offset]] operators but it's not the final query plan that + * gets evaluated. + */ +object CheckOffsetOperator extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan.foreachUp { + case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] + && o.children.exists(_.isInstanceOf[Offset]) => + throw QueryCompilationErrors.invalidOffsetError(s"in: ${o.nodeName}") + case _ => + } + plan match { + case Offset(_, _) => + throw QueryCompilationErrors.invalidOffsetError("to be the outermost node") + case _ => + } + plan + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index dac6a6e731f39..499a884294d81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2394,4 +2394,11 @@ object QueryCompilationErrors extends QueryErrorsBase { def noSuchFunctionError(database: String, funcInfo: String): Throwable = { new AnalysisException(s"$database does not support function: $funcInfo") } + + def invalidOffsetError(reason: String): Throwable = { + new AnalysisException( + s""" + |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET + |clause is found $reason.""".stripMargin.replace("\n", " ")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 486123d2a882a..aa1195a4f00e5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -556,20 +556,6 @@ class AnalysisErrorSuite extends AnalysisTest { "The offset expression must be equal to or greater than 0, but got -1" :: Nil ) - errorTest( - "OFFSET clause is outermost node", - testRelation.offset(Literal(10, IntegerType)), - "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + - " clause is found to be the outermost node." :: Nil - ) - - errorTest( - "OFFSET clause in other node", - testRelation2.offset(Literal(10, IntegerType)).where('b > 1), - "The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" + - " clause found in: Filter." :: Nil - ) - errorTest( "the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue", testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index 6b63f860b7da9..a40f98bd2f415 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, IntegerLiteral, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, IntegerLiteral, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Offset, OneRowRelation, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -71,4 +72,28 @@ class OptimizerSuite extends PlanTest { s"test, please set '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}' to a larger value.")) } } + + test("Test CheckOffsetOperator") { + val optimizer = new SimpleTestOptimizer() + + val analyzed1 = + Offset(Literal(2), Project(Alias(Literal(5), "attr")() :: Nil, OneRowRelation())).analyze + val message1 = intercept[AnalysisException] { + optimizer.execute(analyzed1) + }.getMessage + assert(message1.equals( + s""" + |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET + |clause is found to be the outermost node.""".stripMargin.replace("\n", " "))) + + val analyzed2 = + Filter(EqualTo(UnresolvedAttribute(Seq("attr")), Literal("alex")), analyzed1).analyze + val message2 = intercept[AnalysisException] { + optimizer.execute(analyzed2) + }.getMessage + assert(message2.equals( + s""" + |The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET + |clause is found in: ${analyzed2.nodeName}.""".stripMargin.replace("\n", " "))) + } }