From 9dcbc780d2b43897473f7b5671feb24dd7fd955e Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Wed, 9 Aug 2023 14:09:55 +0800 Subject: [PATCH 1/5] [KYUUBI #5136][Bug] Spark App may hang forever if FinalStageResourceManager killed all executors --- .../spark/kyuubi-extension-spark-3-3/pom.xml | 36 +++++++++++ .../spark/sql/FinalStageResourceManager.scala | 40 ++++++++++++- .../sql/FinalStageResourceManagerSuite.scala | 60 +++++++++++++++++++ .../spark/kyuubi-extension-spark-3-4/pom.xml | 42 +++++++++++++ .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 +++ .../spark/sql/FinalStageResourceManager.scala | 40 ++++++++++++- .../sql/FinalStageResourceManagerSuite.scala | 59 ++++++++++++++++++ .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 +++ 8 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala create mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml index 98c1cca0212..7934a6bd41c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml @@ -37,6 +37,14 @@ ${project.version} + + org.apache.kyuubi + kyuubi-download + ${project.version} + pom + test + + org.apache.kyuubi kyuubi-extension-spark-common_${scala.binary.version} @@ -130,6 +138,34 @@ + + org.codehaus.mojo + build-helper-maven-plugin + + + regex-property + + regex-property + + + spark.home + ${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name} + (.+)\.tgz + $1 + + + + + + org.scalatest + scalatest-maven-plugin + + + ${spark.home} + ${scala.binary.version} + + + org.apache.maven.plugins maven-shade-plugin diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala index dc573f83867..90da2fd8020 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv} +import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan} @@ -185,7 +186,12 @@ case class FinalStageResourceManager(session: SparkSession) numReduce: Int): Unit = { val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient] - val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) + val executorsToKill = + if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) { + executorAllocationClient.getExecutorIds() + } else { + findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) + } logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " + s"[${executorsToKill.mkString(", ")}].") if (executorsToKill.isEmpty) { @@ -210,6 +216,38 @@ case class FinalStageResourceManager(session: SparkSession) adjustTargetNumExecutors = true, countFailures = false, force = false) + + getAdjustedTargetExecutors(sc, executorAllocationClient) + .filter(_ < targetExecutors).foreach { adjustedExecutors => + val delta = targetExecutors - adjustedExecutors + logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " + + s"($targetExecutors). Requesting $delta additional executor(s).") + executorAllocationClient.requestExecutors(delta) + } + } + + private def getAdjustedTargetExecutors( + sc: SparkContext, + executorAllocationClient: ExecutorAllocationClient): Option[Int] = { + executorAllocationClient match { + case schedulerBackend: CoarseGrainedSchedulerBackend => + try { + val field = classOf[CoarseGrainedSchedulerBackend] + .getDeclaredField("requestedTotalExecutorsPerResourceProfile") + field.setAccessible(true) + schedulerBackend.synchronized { + val requestedTotalExecutorsPerResourceProfile = + field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]] + val defaultRp = sc.resourceProfileManager.defaultResourceProfile + requestedTotalExecutorsPerResourceProfile.get(defaultRp) + } + } catch { + case e: Exception => + logWarning("Failed to get requestedTotalExecutors of Default ResourceProfile", e) + None + } + case _ => None + } } @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala new file mode 100644 index 00000000000..0440af76a7e --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.scalatest.time.{Minutes, Span} + +import org.apache.kyuubi.sql.KyuubiSQLConf + +class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { + + override def sparkConf(): SparkConf = { + // It is difficult to run spark in local-cluster mode when spark.testing is set. + sys.props.remove("spark.testing") + + super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.initialExecutors", "3") + .set("spark.dynamicAllocation.minExecutors", "1") + .set("spark.dynamicAllocation.shuffleTracking.enabled", "true") + .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true") + .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, "true") + } + + + test("[KYUUBI #5136][Bug] Final Stage hangs forever") { + // Prerequisite to reproduce the bug: + // 1. Dynamic allocation is enabled. + // 2. Dynamic allocation min executors is 1. + // 3. target executors < active executors. + // 4. No active executor is left after FinalStageResourceManager killed executors. + // This is possible because FinalStageResourceManager retained executors may already be + // requested to be killed but not died yet. + // 5. Final Stage required executors is 1. + withSQLConf( + (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, "true")) { + withTable("final_stage") { + eventually(timeout(Span(10, Minutes))) { + sql( + "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id") + } + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index 947c03ea025..83b73a6d601 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -55,6 +55,14 @@ provided + + org.apache.kyuubi + kyuubi-download + ${project.version} + pom + test + + org.apache.spark spark-core_${scala.binary.version} @@ -111,11 +119,45 @@ jakarta.xml.bind-api test + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + org.codehaus.mojo + build-helper-maven-plugin + + + regex-property + + regex-property + + + spark.home + ${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name} + (.+)\.tgz + $1 + + + + + + org.scalatest + scalatest-maven-plugin + + + ${spark.home} + ${scala.binary.version} + + + org.antlr antlr4-maven-plugin diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index fa118a3e28b..6f45dae126e 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -210,6 +210,14 @@ object KyuubiSQLConf { .booleanConf .createWithDefault(false) + val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL = + buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll") + .doc("When true, eagerly kill all executors before running final write stage. " + + "Mainly for test.") + .version("1.8.0") + .booleanConf + .createWithDefault(false) + val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE = buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache") .doc("When true, skip killing executors if the plan has table caches.") diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala index 16002dfa0fa..e9274fc4982 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv} +import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan} @@ -188,7 +189,12 @@ case class FinalStageResourceManager(session: SparkSession) numReduce: Int): Unit = { val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient] - val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) + val executorsToKill = + if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) { + executorAllocationClient.getExecutorIds() + } else { + findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) + } logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " + s"[${executorsToKill.mkString(", ")}].") if (executorsToKill.isEmpty) { @@ -213,6 +219,38 @@ case class FinalStageResourceManager(session: SparkSession) adjustTargetNumExecutors = true, countFailures = false, force = false) + + getAdjustedTargetExecutors(sc, executorAllocationClient) + .filter(_ < targetExecutors).foreach { adjustedExecutors => + val delta = targetExecutors - adjustedExecutors + logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " + + s"($targetExecutors). Requesting $delta additional executor(s).") + executorAllocationClient.requestExecutors(delta) + } + } + + private def getAdjustedTargetExecutors( + sc: SparkContext, + executorAllocationClient: ExecutorAllocationClient): Option[Int] = { + executorAllocationClient match { + case schedulerBackend: CoarseGrainedSchedulerBackend => + try { + val field = classOf[CoarseGrainedSchedulerBackend] + .getDeclaredField("requestedTotalExecutorsPerResourceProfile") + field.setAccessible(true) + schedulerBackend.synchronized { + val requestedTotalExecutorsPerResourceProfile = + field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]] + val defaultRp = sc.resourceProfileManager.defaultResourceProfile + requestedTotalExecutorsPerResourceProfile.get(defaultRp) + } + } catch { + case e: Exception => + logWarning("Failed to get requestedTotalExecutors of Default ResourceProfile", e) + None + } + case _ => None + } } @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala new file mode 100644 index 00000000000..45fd637adba --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.scalatest.time.{Minutes, Span} + +import org.apache.kyuubi.sql.KyuubiSQLConf + +class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { + + override def sparkConf(): SparkConf = { + // It is difficult to run spark in local-cluster mode when spark.testing is set. + sys.props.remove("spark.testing") + + super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.initialExecutors", "3") + .set("spark.dynamicAllocation.minExecutors", "1") + .set("spark.dynamicAllocation.shuffleTracking.enabled", "true") + .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true") + .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, "true") + } + + test("[KYUUBI #5136][Bug] Final Stage hangs forever") { + // Prerequisite to reproduce the bug: + // 1. Dynamic allocation is enabled. + // 2. Dynamic allocation min executors is 1. + // 3. target executors < active executors. + // 4. No active executor is left after FinalStageResourceManager killed executors. + // This is possible because FinalStageResourceManager retained executors may already be + // requested to be killed but not died yet. + // 5. Final Stage required executors is 1. + withSQLConf( + (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, "true")) { + withTable("final_stage") { + eventually(timeout(Span(10, Minutes))) { + sql( + "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id") + } + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index fa118a3e28b..6f45dae126e 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -210,6 +210,14 @@ object KyuubiSQLConf { .booleanConf .createWithDefault(false) + val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL = + buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll") + .doc("When true, eagerly kill all executors before running final write stage. " + + "Mainly for test.") + .version("1.8.0") + .booleanConf + .createWithDefault(false) + val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE = buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache") .doc("When true, skip killing executors if the plan has table caches.") From 12687eee7097527272f805805fc3f66dbc09dcfe Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Mon, 14 Aug 2023 12:15:29 +0800 Subject: [PATCH 2/5] [KYUUBI #5136][Bug] Spark App may hang forever if FinalStageResourceManager killed all executors --- .../org/apache/spark/sql/FinalStageResourceManagerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala index 0440af76a7e..45fd637adba 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -37,7 +37,6 @@ class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, "true") } - test("[KYUUBI #5136][Bug] Final Stage hangs forever") { // Prerequisite to reproduce the bug: // 1. Dynamic allocation is enabled. From 5f3ca1d9cefbafd4c21e7abd047b530864cee03d Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Tue, 15 Aug 2023 15:24:20 +0800 Subject: [PATCH 3/5] [KYUUBI #5136][Bug] Spark App may hang forever if FinalStageResourceManager killed all executors --- .github/workflows/master.yml | 6 ++-- .../spark/kyuubi-extension-spark-3-3/pom.xml | 8 +++++ .../sql/FinalStageResourceManagerSuite.scala | 2 ++ .../spark/kyuubi-extension-spark-3-4/pom.xml | 8 +++++ .../sql/FinalStageResourceManagerSuite.scala | 2 ++ .../kyuubi/tags/SparkLocalClusterTest.java | 29 +++++++++++++++++++ 6 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 674c7d06080..00e5772a2f5 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -59,17 +59,17 @@ jobs: - java: 8 spark: '3.3' spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6' - exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest' + exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest' comment: 'verify-on-spark-3.1-binary' - java: 8 spark: '3.3' spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6' - exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest' + exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest' comment: 'verify-on-spark-3.2-binary' - java: 8 spark: '3.3' spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.4.0 -Dspark.archive.name=spark-3.4.0-bin-hadoop3.tgz -Pzookeeper-3.6' - exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest' + exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest' comment: 'verify-on-spark-3.4-binary' exclude: # SPARK-33772: Spark supports JDK 17 since 3.3.0 diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml index 7934a6bd41c..f7b7678af47 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml @@ -53,6 +53,14 @@ test + + org.apache.kyuubi + kyuubi-util-scala_${scala.binary.version} + ${project.version} + test-jar + test + + org.scala-lang scala-library diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala index 45fd637adba..2776d46cd41 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -21,7 +21,9 @@ import org.apache.spark.SparkConf import org.scalatest.time.{Minutes, Span} import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.tags.SparkLocalClusterTest +@SparkLocalClusterTest class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { override def sparkConf(): SparkConf = { diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index 83b73a6d601..a5501fea83a 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -63,6 +63,14 @@ test + + org.apache.kyuubi + kyuubi-util-scala_${scala.binary.version} + ${project.version} + test-jar + test + + org.apache.spark spark-core_${scala.binary.version} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala index 45fd637adba..2776d46cd41 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -21,7 +21,9 @@ import org.apache.spark.SparkConf import org.scalatest.time.{Minutes, Span} import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.tags.SparkLocalClusterTest +@SparkLocalClusterTest class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { override def sparkConf(): SparkConf = { diff --git a/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java new file mode 100644 index 00000000000..dd718f125c3 --- /dev/null +++ b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.tags; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.scalatest.TagAnnotation; + +@TagAnnotation +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface SparkLocalClusterTest {} From ea8f247331130fa038ea414eeb99bdf0f4adb848 Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Tue, 15 Aug 2023 18:06:12 +0800 Subject: [PATCH 4/5] Add comment --- extensions/spark/kyuubi-extension-spark-3-3/pom.xml | 4 ++++ extensions/spark/kyuubi-extension-spark-3-4/pom.xml | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml index f7b7678af47..51d21f6844c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml @@ -169,6 +169,10 @@ scalatest-maven-plugin + ${spark.home} ${scala.binary.version} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index a5501fea83a..20db5d12f18 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -161,6 +161,10 @@ scalatest-maven-plugin + ${spark.home} ${scala.binary.version} From c4403eefa6c80bab4a160b747a756329b1193307 Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Wed, 16 Aug 2023 11:33:01 +0800 Subject: [PATCH 5/5] assert adjustedTargetExecutors == 1 --- .../kyuubi/sql/KyuubiSparkSQLExtension.scala | 2 +- .../spark/sql/FinalStageResourceManager.scala | 22 ++++++++++--------- .../sql/FinalStageResourceManagerSuite.scala | 1 + .../kyuubi/sql/KyuubiSparkSQLExtension.scala | 2 +- .../spark/sql/FinalStageResourceManager.scala | 22 ++++++++++--------- .../sql/FinalStageResourceManagerSuite.scala | 1 + 6 files changed, 28 insertions(+), 22 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index 5d346422848..792315d897a 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) extensions.injectPlannerStrategy(MaxScanStrategy) - extensions.injectQueryStagePrepRule(FinalStageResourceManager) + extensions.injectQueryStagePrepRule(FinalStageResourceManager(_)) extensions.injectQueryStagePrepRule(InjectCustomResourceProfile) } } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala index 90da2fd8020..32fb9f5ce84 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.catalyst.rules.Rule @@ -217,7 +218,7 @@ case class FinalStageResourceManager(session: SparkSession) countFailures = false, force = false) - getAdjustedTargetExecutors(sc, executorAllocationClient) + FinalStageResourceManager.getAdjustedTargetExecutors(sc) .filter(_ < targetExecutors).foreach { adjustedExecutors => val delta = targetExecutors - adjustedExecutors logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " + @@ -226,10 +227,16 @@ case class FinalStageResourceManager(session: SparkSession) } } - private def getAdjustedTargetExecutors( - sc: SparkContext, - executorAllocationClient: ExecutorAllocationClient): Option[Int] = { - executorAllocationClient match { + @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + OptimizeSkewInRebalancePartitions, + CoalesceShufflePartitions(session), + OptimizeShuffleWithLocalRead) +} + +object FinalStageResourceManager extends Logging { + + private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = { + sc.schedulerBackend match { case schedulerBackend: CoarseGrainedSchedulerBackend => try { val field = classOf[CoarseGrainedSchedulerBackend] @@ -249,11 +256,6 @@ case class FinalStageResourceManager(session: SparkSession) case _ => None } } - - @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - OptimizeSkewInRebalancePartitions, - CoalesceShufflePartitions(session), - OptimizeShuffleWithLocalRead) } trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper { diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala index 2776d46cd41..4b9991ef6f2 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -55,6 +55,7 @@ class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { sql( "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id") } + assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get == 1) } } } diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index 5d346422848..792315d897a 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) extensions.injectPlannerStrategy(MaxScanStrategy) - extensions.injectQueryStagePrepRule(FinalStageResourceManager) + extensions.injectQueryStagePrepRule(FinalStageResourceManager(_)) extensions.injectQueryStagePrepRule(InjectCustomResourceProfile) } } diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala index e9274fc4982..81873476cc4 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.catalyst.rules.Rule @@ -220,7 +221,7 @@ case class FinalStageResourceManager(session: SparkSession) countFailures = false, force = false) - getAdjustedTargetExecutors(sc, executorAllocationClient) + FinalStageResourceManager.getAdjustedTargetExecutors(sc) .filter(_ < targetExecutors).foreach { adjustedExecutors => val delta = targetExecutors - adjustedExecutors logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " + @@ -229,10 +230,16 @@ case class FinalStageResourceManager(session: SparkSession) } } - private def getAdjustedTargetExecutors( - sc: SparkContext, - executorAllocationClient: ExecutorAllocationClient): Option[Int] = { - executorAllocationClient match { + @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + OptimizeSkewInRebalancePartitions, + CoalesceShufflePartitions(session), + OptimizeShuffleWithLocalRead) +} + +object FinalStageResourceManager extends Logging { + + private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = { + sc.schedulerBackend match { case schedulerBackend: CoarseGrainedSchedulerBackend => try { val field = classOf[CoarseGrainedSchedulerBackend] @@ -252,11 +259,6 @@ case class FinalStageResourceManager(session: SparkSession) case _ => None } } - - @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - OptimizeSkewInRebalancePartitions, - CoalesceShufflePartitions(session), - OptimizeShuffleWithLocalRead) } trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper { diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala index 2776d46cd41..4b9991ef6f2 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala @@ -55,6 +55,7 @@ class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest { sql( "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id") } + assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get == 1) } } }