diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
similarity index 100%
rename from extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
rename to extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
similarity index 100%
rename from extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
rename to extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
diff --git a/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
new file mode 100644
index 00000000000..f0d38465734
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+class InsertShuffleNodeBeforeJoinSuite extends InsertShuffleNodeBeforeJoinSuiteBase
diff --git a/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
new file mode 100644
index 00000000000..fd04e27dbb5
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase {}
+
+class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase {}
diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
new file mode 100644
index 00000000000..3343946e827
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import org.apache.spark.sql.SparkSessionExtensions
+
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
+
+class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
+ override def apply(extensions: SparkSessionExtensions): Unit = {
+ KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
+ }
+}
+
+object KyuubiSparkSQLCommonExtension {
+ def injectCommonExtensions(extensions: SparkSessionExtensions): Unit = {
+ // inject zorder parser and related rules
+ extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
+ extensions.injectResolutionRule(ResolveZorder)
+
+ // Note that:
+ // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
+ // should be applied before
+ // RepartitionBeforeWriting and RebalanceBeforeWriting
+ // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
+ extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
+
+ extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
+ extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
new file mode 100644
index 00000000000..2f12a82e23e
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface, PostProcessor}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.types.{DataType, StructType}
+
+abstract class KyuubiSparkSQLParserBase extends ParserInterface {
+ def delegate: ParserInterface
+ def astBuilder: KyuubiSparkSQLAstBuilderBase
+
+ override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ protected def parse[T](command: String)(toResult: KyuubiSparkSQLParser => T): T = {
+ val lexer = new KyuubiSparkSQLLexer(
+ new UpperCaseCharStream(CharStreams.fromString(command)))
+ lexer.removeErrorListeners()
+ lexer.addErrorListener(ParseErrorListener)
+
+ val tokenStream = new CommonTokenStream(lexer)
+ val parser = new KyuubiSparkSQLParser(tokenStream)
+ parser.addParseListener(PostProcessor)
+ parser.removeErrorListeners()
+ parser.addErrorListener(ParseErrorListener)
+
+ try {
+ try {
+ // first, try parsing with potentially faster SLL mode
+ parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+ toResult(parser)
+ } catch {
+ case _: ParseCancellationException =>
+ // if we fail, parse with LL mode
+ tokenStream.seek(0) // rewind input stream
+ parser.reset()
+
+ // Try Again.
+ parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+ toResult(parser)
+ }
+ } catch {
+ case e: ParseException if e.command.isDefined =>
+ throw e
+ case e: ParseException =>
+ throw e.withCommand(command)
+ case e: AnalysisException =>
+ val position = Origin(e.line, e.startPosition)
+ throw new ParseException(Option(command), e.message, position, position)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+}
+
+class SparkKyuubiSparkSQLParser(
+ override val delegate: ParserInterface)
+ extends KyuubiSparkSQLParserBase {
+ def astBuilder: KyuubiSparkSQLAstBuilderBase = new KyuubiSparkSQLAstBuilder
+}
+
+/* Copied from Apache Spark's to avoid dependency on Spark Internals */
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+ override def consume(): Unit = wrapped.consume()
+ override def getSourceName(): String = wrapped.getSourceName
+ override def index(): Int = wrapped.index
+ override def mark(): Int = wrapped.mark
+ override def release(marker: Int): Unit = wrapped.release(marker)
+ override def seek(where: Int): Unit = wrapped.seek(where)
+ override def size(): Int = wrapped.size
+
+ override def getText(interval: Interval): String = wrapped.getText(interval)
+
+ // scalastyle:off
+ override def LA(i: Int): Int = {
+ val la = wrapped.LA(i)
+ if (la == 0 || la == IntStream.EOF) la
+ else Character.toUpperCase(la)
+ }
+ // scalastyle:on
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
new file mode 100644
index 00000000000..f0d38465734
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+class InsertShuffleNodeBeforeJoinSuite extends InsertShuffleNodeBeforeJoinSuiteBase
diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
new file mode 100644
index 00000000000..fd04e27dbb5
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase {}
+
+class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase {}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
new file mode 100644
index 00000000000..16f8f213af0
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
@@ -0,0 +1,159 @@
+
+
+
+
+
+ org.apache.kyuubi
+ kyuubi-parent
+ 1.6.0-SNAPSHOT
+ ../../../pom.xml
+
+ 4.0.0
+
+ kyuubi-extension-spark-3-3_2.12
+ Kyuubi Dev Spark Extensions (for Spark 3.3)
+ jar
+ https://kyuubi.apache.org/
+
+
+
+ org.apache.kyuubi
+ kyuubi-extension-spark-common_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.kyuubi
+ kyuubi-extension-spark-common_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.scala-lang
+ scala-library
+ provided
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ provided
+
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ provided
+
+
+
+ org.apache.hadoop
+ hadoop-client-api
+ provided
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ test-jar
+ test
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ test-jar
+ test
+
+
+
+ org.scalatestplus
+ scalacheck-1-15_${scala.binary.version}
+ test
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ test-jar
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-client-runtime
+ test
+
+
+
+
+ commons-collections
+ commons-collections
+ test
+
+
+
+ commons-io
+ commons-io
+ test
+
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+ test
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+
+
+ org.apache.kyuubi:kyuubi-extension-spark-common_${scala.binary.version}
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala
new file mode 100644
index 00000000000..e740554d443
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.sql
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunc, UnresolvedRelation, UnresolvedTableOrView, UnresolvedView}
+import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, DropNamespace, DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, DropTableCommand}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf._
+
+case class DropIgnoreNonexistent(session: SparkSession) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (conf.getConf(DROP_IGNORE_NONEXISTENT)) {
+ plan match {
+ case i @ AlterTableDropPartitionCommand(_, _, false, _, _) =>
+ i.copy(ifExists = true)
+ case i @ DropTableCommand(_, false, _, _) =>
+ i.copy(ifExists = true)
+ case i @ DropNamespace(_, false, _) =>
+ i.copy(ifExists = true)
+ // like: org.apache.spark.sql.catalyst.analysis.ResolveCommandsWithIfExists
+ case DropTable(u: UnresolvedTableOrView, false, _) =>
+ NoopCommand("DROP TABLE", u.multipartIdentifier)
+ case DropView(u: UnresolvedView, false) =>
+ NoopCommand("DROP VIEW", u.multipartIdentifier)
+ case UncacheTable(u: UnresolvedRelation, false, _) =>
+ NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
+ case DropFunction(u: UnresolvedFunc, false) =>
+ NoopCommand("DROP FUNCTION", u.multipartIdentifier)
+ case _ => plan
+ }
+ } else {
+ plan
+ }
+ }
+
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
new file mode 100644
index 00000000000..3343946e827
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import org.apache.spark.sql.SparkSessionExtensions
+
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
+
+class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
+ override def apply(extensions: SparkSessionExtensions): Unit = {
+ KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
+ }
+}
+
+object KyuubiSparkSQLCommonExtension {
+ def injectCommonExtensions(extensions: SparkSessionExtensions): Unit = {
+ // inject zorder parser and related rules
+ extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
+ extensions.injectResolutionRule(ResolveZorder)
+
+ // Note that:
+ // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
+ // should be applied before
+ // RepartitionBeforeWriting and RebalanceBeforeWriting
+ // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
+ extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
+ extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
+
+ extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
+ extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
new file mode 100644
index 00000000000..ef9da41be13
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import org.apache.spark.sql.SparkSessionExtensions
+
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
+
+// scalastyle:off line.size.limit
+/**
+ * Depend on Spark SQL Extension framework, we can use this extension follow steps
+ * 1. move this jar into $SPARK_HOME/jars
+ * 2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension`
+ */
+// scalastyle:on line.size.limit
+class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
+ override def apply(extensions: SparkSessionExtensions): Unit = {
+ KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
+
+ extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource)
+ extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive)
+ extensions.injectPostHocResolutionRule(DropIgnoreNonexistent)
+
+ // watchdog extension
+ extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
+ extensions.injectPlannerStrategy(MaxPartitionStrategy)
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
new file mode 100644
index 00000000000..af1711ebbe7
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLParser.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface, PostProcessor}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.types.{DataType, StructType}
+
+abstract class KyuubiSparkSQLParserBase extends ParserInterface {
+ def delegate: ParserInterface
+ def astBuilder: KyuubiSparkSQLAstBuilderBase
+
+ override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ protected def parse[T](command: String)(toResult: KyuubiSparkSQLParser => T): T = {
+ val lexer = new KyuubiSparkSQLLexer(
+ new UpperCaseCharStream(CharStreams.fromString(command)))
+ lexer.removeErrorListeners()
+ lexer.addErrorListener(ParseErrorListener)
+
+ val tokenStream = new CommonTokenStream(lexer)
+ val parser = new KyuubiSparkSQLParser(tokenStream)
+ parser.addParseListener(PostProcessor)
+ parser.removeErrorListeners()
+ parser.addErrorListener(ParseErrorListener)
+
+ try {
+ try {
+ // first, try parsing with potentially faster SLL mode
+ parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+ toResult(parser)
+ } catch {
+ case _: ParseCancellationException =>
+ // if we fail, parse with LL mode
+ tokenStream.seek(0) // rewind input stream
+ parser.reset()
+
+ // Try Again.
+ parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+ toResult(parser)
+ }
+ } catch {
+ case e: ParseException if e.command.isDefined =>
+ throw e
+ case e: ParseException =>
+ throw e.withCommand(command)
+ case e: AnalysisException =>
+ val position = Origin(e.line, e.startPosition)
+ throw new ParseException(Option(command), e.message, position, position)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ /**
+ * This functions was introduced since spark-3.3, for more details, please see
+ * https://github.com/apache/spark/pull/34543
+ */
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
+
+class SparkKyuubiSparkSQLParser(
+ override val delegate: ParserInterface)
+ extends KyuubiSparkSQLParserBase {
+ def astBuilder: KyuubiSparkSQLAstBuilderBase = new KyuubiSparkSQLAstBuilder
+}
+
+/* Copied from Apache Spark's to avoid dependency on Spark Internals */
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+ override def consume(): Unit = wrapped.consume()
+ override def getSourceName(): String = wrapped.getSourceName
+ override def index(): Int = wrapped.index
+ override def mark(): Int = wrapped.mark
+ override def release(marker: Int): Unit = wrapped.release(marker)
+ override def seek(where: Int): Unit = wrapped.seek(where)
+ override def size(): Int = wrapped.size
+
+ override def getText(interval: Interval): String = wrapped.getText(interval)
+
+ // scalastyle:off
+ override def LA(i: Int): Int = {
+ val la = wrapped.LA(i)
+ if (la == 0 || la == IntStream.EOF) la
+ else Character.toUpperCase(la)
+ }
+ // scalastyle:on
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
new file mode 100644
index 00000000000..8f7f17c4ad9
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical._
+
+trait RepartitionBuilderWithRebalance extends RepartitionBuilder {
+ override def buildRepartition(
+ dynamicPartitionColumns: Seq[Attribute],
+ query: LogicalPlan): LogicalPlan = {
+ RebalancePartitions(dynamicPartitionColumns, query)
+ }
+
+ override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
+ super.canInsertRepartitionByExpression(plan) && {
+ plan match {
+ case _: RebalancePartitions => false
+ case _ => true
+ }
+ }
+ }
+}
+
+/**
+ * For datasource table, there two commands can write data to table
+ * 1. InsertIntoHadoopFsRelationCommand
+ * 2. CreateDataSourceTableAsSelectCommand
+ * This rule add a RebalancePartitions node between write and query
+ */
+case class RebalanceBeforeWritingDatasource(session: SparkSession)
+ extends RepartitionBeforeWritingDatasourceBase
+ with RepartitionBuilderWithRebalance {}
+
+/**
+ * For Hive table, there two commands can write data to table
+ * 1. InsertIntoHiveTable
+ * 2. CreateHiveTableAsSelectCommand
+ * This rule add a RebalancePartitions node between write and query
+ */
+case class RebalanceBeforeWritingHive(session: SparkSession)
+ extends RepartitionBeforeWritingHiveBase
+ with RepartitionBuilderWithRebalance {}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
new file mode 100644
index 00000000000..a3d990b1098
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult, LogicalPlan, Union, WithCTE}
+import org.apache.spark.sql.execution.command.DataWritingCommand
+
+case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends ForcedMaxOutputRowsBase {
+
+ override protected def isChildAggregate(a: Aggregate): Boolean = false
+
+ override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
+ case WithCTE(plan, _) => this.canInsertLimitInner(plan)
+ case plan: LogicalPlan => plan match {
+ case Union(children, _, _) => !children.exists {
+ case _: DataWritingCommand => true
+ case p: CommandResult if p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
+ case _ => false
+ }
+ case _ => super.canInsertLimitInner(plan)
+ }
+ }
+
+ override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]): Boolean = {
+ p match {
+ case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
+ case _ => super.canInsertLimit(p, maxOutputRowsOpt)
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/resources/log4j.properties b/extensions/spark/kyuubi-extension-spark-3-3/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..68d6a0e66e2
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/resources/log4j.properties
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootLogger=INFO, CA, FA
+
+# Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = FATAL
+
+# File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = DEBUG
+
+# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
+log4j.appender.CA.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.CA.filter.1.StringToMatch=Thrift error occurred during processing of message
+log4j.appender.CA.filter.1.AcceptOnMatch=false
+
+log4j.appender.FA.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.FA.filter.1.StringToMatch=Thrift error occurred during processing of message
+log4j.appender.FA.filter.1.AcceptOnMatch=false
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala
new file mode 100644
index 00000000000..ff13268948f
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.{DropNamespace, NoopCommand}
+import org.apache.spark.sql.execution.command._
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+class DropIgnoreNonexistentSuite extends KyuubiSparkSQLExtensionTest {
+
+ test("drop ignore nonexistent") {
+ withSQLConf(KyuubiSQLConf.DROP_IGNORE_NONEXISTENT.key -> "true") {
+ // drop nonexistent database
+ val df1 = sql("DROP DATABASE nonexistent_database")
+ assert(df1.queryExecution.analyzed.asInstanceOf[DropNamespace].ifExists == true)
+
+ // drop nonexistent table
+ val df2 = sql("DROP TABLE nonexistent_table")
+ assert(df2.queryExecution.analyzed.isInstanceOf[NoopCommand])
+
+ // drop nonexistent view
+ val df3 = sql("DROP VIEW nonexistent_view")
+ assert(df3.queryExecution.analyzed.isInstanceOf[NoopCommand])
+
+ // drop nonexistent function
+ val df4 = sql("DROP FUNCTION nonexistent_function")
+ assert(df4.queryExecution.analyzed.isInstanceOf[NoopCommand])
+
+ // drop nonexistent PARTITION
+ withTable("test") {
+ sql("CREATE TABLE IF NOT EXISTS test(i int) PARTITIONED BY (p int)")
+ val df5 = sql("ALTER TABLE test DROP PARTITION (p = 1)")
+ assert(df5.queryExecution.analyzed
+ .asInstanceOf[AlterTableDropPartitionCommand].ifExists == true)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
new file mode 100644
index 00000000000..0f983dc6c5f
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, QueryStageExec}
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
+
+class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ setupData()
+ }
+
+ test("final stage config set reset check") {
+ withSQLConf(
+ KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
+ "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" -> "1",
+ "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
+ // use loop to double check final stage config doesn't affect the sql query each other
+ (1 to 3).foreach { _ =>
+ sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect()
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") ===
+ FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG)
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.adaptive.coalescePartitions.minPartitionNum") ===
+ "1")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum") ===
+ "1")
+
+ // 64MB
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") ===
+ "67108864b")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
+ "100")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") ===
+ "100")
+ }
+
+ sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
+ "1")
+ assert(!spark.sessionState.conf.contains(
+ "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes"))
+
+ sql("SET a=1")
+ assert(spark.sessionState.conf.getConfString("a") === "1")
+
+ sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum")
+ assert(!spark.sessionState.conf.contains(
+ "spark.sql.adaptive.coalescePartitions.minPartitionNum"))
+ assert(!spark.sessionState.conf.contains(
+ "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum"))
+
+ sql("RESET a")
+ assert(!spark.sessionState.conf.contains("a"))
+ }
+ }
+
+ test("final stage config isolation") {
+ def checkPartitionNum(
+ sqlString: String,
+ previousPartitionNum: Int,
+ finalPartitionNum: Int): Unit = {
+ val df = sql(sqlString)
+ df.collect()
+ val shuffleReaders = collect(df.queryExecution.executedPlan) {
+ case customShuffleReader: AQEShuffleReadExec => customShuffleReader
+ }
+ assert(shuffleReaders.nonEmpty)
+ // reorder stage by stage id to ensure we get the right stage
+ val sortedShuffleReaders = shuffleReaders.sortWith {
+ case (s1, s2) =>
+ s1.child.asInstanceOf[QueryStageExec].id < s2.child.asInstanceOf[QueryStageExec].id
+ }
+ if (sortedShuffleReaders.length > 1) {
+ assert(sortedShuffleReaders.head.partitionSpecs.length === previousPartitionNum)
+ }
+ assert(sortedShuffleReaders.last.partitionSpecs.length === finalPartitionNum)
+ assert(df.rdd.partitions.length === finalPartitionNum)
+ }
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "3",
+ KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
+ "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1",
+ "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1",
+ "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "10000000") {
+
+ // use loop to double check final stage config doesn't affect the sql query each other
+ (1 to 3).foreach { _ =>
+ checkPartitionNum(
+ "SELECT c1, count(*) FROM t1 GROUP BY c1",
+ 1,
+ 1)
+
+ checkPartitionNum(
+ "SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP BY c1) GROUP BY c2",
+ 3,
+ 1)
+
+ checkPartitionNum(
+ "SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY t1.c1",
+ 3,
+ 1)
+
+ checkPartitionNum(
+ """
+ | SELECT /*+ REPARTITION */
+ | t1.c1, count(*) FROM t1
+ | JOIN t2 ON t1.c2 = t2.c2
+ | JOIN t3 ON t1.c1 = t3.c1
+ | GROUP BY t1.c1
+ |""".stripMargin,
+ 3,
+ 1)
+
+ // one shuffle reader
+ checkPartitionNum(
+ """
+ | SELECT /*+ BROADCAST(t1) */
+ | t1.c1, t2.c2 FROM t1
+ | JOIN t2 ON t1.c2 = t2.c2
+ | DISTRIBUTE BY c1
+ |""".stripMargin,
+ 1,
+ 1)
+
+ // test ReusedExchange
+ checkPartitionNum(
+ """
+ |SELECT /*+ REPARTITION */ t0.c2 FROM (
+ |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
+ |) t0 JOIN (
+ |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
+ |) t1 ON t0.c2 = t1.c2
+ |""".stripMargin,
+ 3,
+ 1)
+
+ // one shuffle reader
+ checkPartitionNum(
+ """
+ |SELECT t0.c1 FROM (
+ |SELECT t1.c1 FROM t1 GROUP BY t1.c1
+ |) t0 JOIN (
+ |SELECT t1.c1 FROM t1 GROUP BY t1.c1
+ |) t1 ON t0.c1 = t1.c1
+ |""".stripMargin,
+ 1,
+ 1)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
new file mode 100644
index 00000000000..f0d38465734
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+class InsertShuffleNodeBeforeJoinSuite extends InsertShuffleNodeBeforeJoinSuiteBase
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
new file mode 100644
index 00000000000..f1a27cdb871
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.RebalancePartitions
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
+ test("check rebalance exists") {
+ def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ assert(
+ df.queryExecution.analyzed.collect {
+ case r: RebalancePartitions => r
+ }.size == expectedRebalanceNum)
+ }
+
+ // It's better to set config explicitly in case of we change the default value.
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
+ Seq("USING PARQUET", "").foreach { storage =>
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " +
+ "SELECT * FROM VALUES(1),(2) AS t(c1)"))
+ }
+
+ withTable("tmp1", "tmp2") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
+ sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)")
+ check(
+ sql(
+ """FROM VALUES(1),(2)
+ |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
+ |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
+ |""".stripMargin),
+ 2)
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
+ }
+
+ withTable("tmp1", "tmp2") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ sql(s"CREATE TABLE tmp2 (c1 int) $storage")
+ check(
+ sql(
+ """FROM VALUES(1),(2),(3)
+ |INSERT INTO TABLE tmp1 SELECT *
+ |INSERT INTO TABLE tmp2 SELECT *
+ |""".stripMargin),
+ 2)
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+ s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
+ }
+ }
+ }
+ }
+
+ test("check rebalance does not exists") {
+ def check(df: DataFrame): Unit = {
+ assert(
+ df.queryExecution.analyzed.collect {
+ case r: RebalancePartitions => r
+ }.isEmpty)
+ }
+
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
+ // test no write command
+ check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+ check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+
+ // test not supported plan
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)")
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1"))
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10"))
+ }
+ }
+
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") {
+ Seq("USING PARQUET", "").foreach { storage =>
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+ s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
+ }
+ }
+ }
+ }
+
+ test("test dynamic partition write") {
+ def checkRepartitionExpression(df: DataFrame): Unit = {
+ assert(df.queryExecution.analyzed.collect {
+ case r: RebalancePartitions if r.partitionExpressions.size == 1 =>
+ assert(r.partitionExpressions.head.asInstanceOf[Attribute].name === "c2")
+ r
+ }.size == 1)
+ }
+
+ withSQLConf(
+ KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
+ KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
+ Seq("USING PARQUET", "").foreach { storage =>
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
+ checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as c1, 'a' as c2 "))
+ }
+
+ withTable("tmp1") {
+ checkRepartitionExpression(
+ sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as c2 "))
+ }
+ }
+ }
+ }
+
+ test("OptimizedCreateHiveTableAsSelectCommand") {
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
+ HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
+ withTable("t") {
+ val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
+ val ctas = df.queryExecution.analyzed.collect {
+ case _: OptimizedCreateHiveTableAsSelectCommand => true
+ }
+ assert(ctas.size == 1)
+ val repartition = df.queryExecution.analyzed.collect {
+ case _: RebalancePartitions => true
+ }
+ assert(repartition.size == 1)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
new file mode 100644
index 00000000000..957089340ca
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+class WatchDogSuite extends WatchDogSuiteBase {}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
new file mode 100644
index 00000000000..fd04e27dbb5
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase {}
+
+class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase {}
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
index d724c7e40eb..61ab07adfb1 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
@@ -97,7 +97,7 @@ case class MaxPartitionStrategy(session: SparkSession)
_),
_,
_,
- _)) if fsRelation.partitionSchemaOption.isDefined =>
+ _)) if fsRelation.partitionSchema.nonEmpty =>
val (partitionKeyFilters, dataFilter) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
@@ -130,7 +130,7 @@ case class MaxPartitionStrategy(session: SparkSession)
_),
_,
_,
- _)) if fsRelation.partitionSchemaOption.isDefined =>
+ _)) if fsRelation.partitionSchema.nonEmpty =>
val (partitionKeyFilters, _) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuiteBase.scala
similarity index 97%
rename from extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
rename to extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuiteBase.scala
index e66f2cee5d1..c657dee49f3 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuiteBase.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.kyuubi.sql.KyuubiSQLConf
-class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
+trait InsertShuffleNodeBeforeJoinSuiteBase extends KyuubiSparkSQLExtensionTest {
override protected def beforeAll(): Unit = {
super.beforeAll()
setupData()
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuiteBase.scala
similarity index 99%
rename from extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
rename to extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuiteBase.scala
index 9eea2957f90..b24533e6926 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuiteBase.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
import org.apache.kyuubi.sql.zorder.{OptimizeZorderCommandBase, Zorder, ZorderBytesUtils}
-trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper {
+trait ZorderSuiteBase extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper {
override def sparkConf(): SparkConf = {
super.sparkConf()
.set(
@@ -654,7 +654,7 @@ trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper
}
}
-class ZorderWithCodegenEnabledSuite extends ZorderSuite {
+trait ZorderWithCodegenEnabledSuiteBase extends ZorderSuiteBase {
override def sparkConf(): SparkConf = {
val conf = super.sparkConf
conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
@@ -662,7 +662,7 @@ class ZorderWithCodegenEnabledSuite extends ZorderSuite {
}
}
-class ZorderWithCodegenDisabledSuite extends ZorderSuite {
+trait ZorderWithCodegenDisabledSuiteBase extends ZorderSuiteBase {
override def sparkConf(): SparkConf = {
val conf = super.sparkConf
conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
diff --git a/pom.xml b/pom.xml
index b93fb50533a..b1b362e3da1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1943,6 +1943,7 @@
extensions/spark/kyuubi-extension-spark-common
extensions/spark/kyuubi-extension-spark-3-1
+ extensions/spark/kyuubi-spark-authz
@@ -1960,6 +1961,7 @@
extensions/spark/kyuubi-extension-spark-common
+ extensions/spark/kyuubi-spark-authz
extensions/spark/kyuubi-extension-spark-3-2
@@ -1985,6 +1987,8 @@
org.apache.kyuubi.tags.ExtendedSQLTest,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.HudiTest
+ extensions/spark/kyuubi-extension-spark-common
+ extensions/spark/kyuubi-extension-spark-3-3
extensions/spark/kyuubi-spark-connector-kudu