Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
}
checkCollectedMetrics(plan)
checkOffsetOperator(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -851,30 +850,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
check(plan)
}

/**
* Validate whether the [[Offset]] is valid.
*/
private def checkOffsetOperator(plan: LogicalPlan): Unit = {
plan.foreachUp {
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
case _ =>
}
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found to be the outermost node.""".stripMargin.replace("\n", " "))
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
private val rules = Seq(
CheckOffsetOperator,
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -138,3 +139,25 @@ object SpecialDatetimeValues extends Rule[LogicalPlan] {
}
}
}

/**
* Validate whether the [[Offset]] is valid. Dataset API eagerly analyzes the query plan, so a
* query plan may contains invalid [[Offset]] operators but it's not the final query plan that
* gets evaluated.
*/
object CheckOffsetOperator extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
plan.foreachUp {
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
throw QueryCompilationErrors.invalidOffsetError(s"in: ${o.nodeName}")
case _ =>
}
plan match {
case Offset(_, _) =>
throw QueryCompilationErrors.invalidOffsetError("to be the outermost node")
case _ =>
}
plan
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2394,4 +2394,11 @@ object QueryCompilationErrors extends QueryErrorsBase {
def noSuchFunctionError(database: String, funcInfo: String): Throwable = {
new AnalysisException(s"$database does not support function: $funcInfo")
}

def invalidOffsetError(reason: String): Throwable = {
new AnalysisException(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found $reason.""".stripMargin.replace("\n", " "))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,20 +556,6 @@ class AnalysisErrorSuite extends AnalysisTest {
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause is found to be the outermost node." :: Nil
)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET" +
" clause found in: Filter." :: Nil
)

errorTest(
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, IntegerLiteral, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, IntegerLiteral, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Offset, OneRowRelation, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -71,4 +72,28 @@ class OptimizerSuite extends PlanTest {
s"test, please set '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}' to a larger value."))
}
}

test("Test CheckOffsetOperator") {
val optimizer = new SimpleTestOptimizer()

val analyzed1 =
Offset(Literal(2), Project(Alias(Literal(5), "attr")() :: Nil, OneRowRelation())).analyze
val message1 = intercept[AnalysisException] {
optimizer.execute(analyzed1)
}.getMessage
assert(message1.equals(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found to be the outermost node.""".stripMargin.replace("\n", " ")))

val analyzed2 =
Filter(EqualTo(UnresolvedAttribute(Seq("attr")), Literal("alex")), analyzed1).analyze
val message2 = intercept[AnalysisException] {
optimizer.execute(analyzed2)
}.getMessage
assert(message2.equals(
s"""
|The OFFSET clause is only allowed in the LIMIT clause, but the OFFSET
|clause is found in: ${analyzed2.nodeName}.""".stripMargin.replace("\n", " ")))
}
}