diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 40fa228d81003..1e9c431292b9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -433,13 +433,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) - case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] - && o.children.exists(_.isInstanceOf[Offset]) => - failAnalysis( - s""" - |The OFFSET clause is allowed in the LIMIT clause or be the outermost node, - |but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " ")) - case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr) case _: Union | _: SetOperation if operator.children.length > 1 => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 4c8e066f1d1b8..569fd57252a22 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -556,13 +556,6 @@ class AnalysisErrorSuite extends AnalysisTest { "The offset expression must be equal to or greater than 0, but got -1" :: Nil ) - errorTest( - "OFFSET clause in other node", - testRelation2.offset(Literal(10, IntegerType)).where('b > 1), - "The OFFSET clause is allowed in the LIMIT clause or be the outermost node," + - " but the OFFSET clause found in: Filter." :: Nil - ) - errorTest( "the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue", testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 36b6d6b470d8a..7392d791fdb80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2102,6 +2102,16 @@ class Dataset[T] private[sql]( Limit(Literal(n), logicalPlan) } + /** + * Returns a new Dataset by skipping the first `m` rows. + * + * @group typedrel + * @since 3.4.0 + */ + def offset(n: Int): Dataset[T] = withTypedPlan { + Offset(Literal(n), logicalPlan) + } + /** * Returns a new Dataset containing union of rows in this Dataset and another Dataset. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 47f79c9ada70c..c5c718088f308 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -605,6 +605,30 @@ class DataFrameSuite extends QueryTest ) } + test("offset") { + checkAnswer( + testData.offset(90), + testData.collect().drop(90).toSeq) + + checkAnswer( + arrayData.toDF().offset(99), + arrayData.collect().drop(99).map(r => Row.fromSeq(r.productIterator.toSeq))) + + checkAnswer( + mapData.toDF().offset(99), + mapData.collect().drop(99).map(r => Row.fromSeq(r.productIterator.toSeq))) + } + + test("limit with offset") { + checkAnswer( + testData.limit(10).offset(5), + testData.take(10).drop(5).toSeq) + + checkAnswer( + testData.offset(5).limit(10), + testData.take(15).drop(5).toSeq) + } + test("udf") { val foo = udf((a: Int, b: String) => a.toString + b)