From c3e9b9b99773d84e15ed7d486bd7a31ef5f9a3bf Mon Sep 17 00:00:00 2001 From: attilapiros Date: Fri, 27 Jan 2023 13:23:38 -0800 Subject: [PATCH 1/5] Initial version. --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- .../scala/org/apache/spark/internal/config/package.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fa19c7918af2..1da25e3f18b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1005,8 +1005,9 @@ private[spark] class SparkSubmit extends Logging { case t: Throwable => throw findCause(t) } finally { - if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && - !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) { + if (sparkConf.get(AUTO_STOP_ACTIVE_SPARK_CONTEXTS) && args.master.startsWith("k8s") && + !isShell(args.primaryResource) && !isSqlShell(args.mainClass) && + !isThriftServer(args.mainClass)) { try { SparkContext.getActive.foreach(_.stop()) } catch { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index be210cfe59b3..d567be1e5167 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2020,6 +2020,14 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val AUTO_STOP_ACTIVE_SPARK_CONTEXTS = + ConfigBuilder("spark.kubernetes.submit.autoStopActiveSparkContexts") + .version("3.4.0") + .doc("When set to true, on Kubernetes Spark will stop all the active Spark contexts after " + + "the finish of non-shell applications' main method.") + .booleanConf + .createWithDefault(false) + private[spark] val SCHEDULER_ALLOCATION_FILE = ConfigBuilder("spark.scheduler.allocation.file") .version("0.8.1") From b79e3cf1e9ebd1e5f95067b8d7bef75181413836 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Fri, 27 Jan 2023 15:08:44 -0800 Subject: [PATCH 2/5] fix config's version tag --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d567be1e5167..d9e942b8cc1a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2022,7 +2022,7 @@ package object config { private[spark] val AUTO_STOP_ACTIVE_SPARK_CONTEXTS = ConfigBuilder("spark.kubernetes.submit.autoStopActiveSparkContexts") - .version("3.4.0") + .version("3.5.0") .doc("When set to true, on Kubernetes Spark will stop all the active Spark contexts after " + "the finish of non-shell applications' main method.") .booleanConf From 7614a5a8e1414217ca4603cb853d7b3a1086532e Mon Sep 17 00:00:00 2001 From: attilapiros Date: Fri, 27 Jan 2023 15:32:27 -0800 Subject: [PATCH 3/5] Move config to the k8s module --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- .../scala/org/apache/spark/internal/config/package.scala | 8 -------- .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 8 ++++++++ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1da25e3f18b4..0a5b6ee0fe03 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1005,9 +1005,9 @@ private[spark] class SparkSubmit extends Logging { case t: Throwable => throw findCause(t) } finally { - if (sparkConf.get(AUTO_STOP_ACTIVE_SPARK_CONTEXTS) && args.master.startsWith("k8s") && - !isShell(args.primaryResource) && !isSqlShell(args.mainClass) && - !isThriftServer(args.mainClass)) { + if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && + !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) && + sparkConf.getBoolean("spark.kubernetes.submit.autoStopActiveSparkContexts", false)) { try { SparkContext.getActive.foreach(_.stop()) } catch { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d9e942b8cc1a..be210cfe59b3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2020,14 +2020,6 @@ package object config { .toSequence .createWithDefault(Nil) - private[spark] val AUTO_STOP_ACTIVE_SPARK_CONTEXTS = - ConfigBuilder("spark.kubernetes.submit.autoStopActiveSparkContexts") - .version("3.5.0") - .doc("When set to true, on Kubernetes Spark will stop all the active Spark contexts after " + - "the finish of non-shell applications' main method.") - .booleanConf - .createWithDefault(false) - private[spark] val SCHEDULER_ALLOCATION_FILE = ConfigBuilder("spark.scheduler.allocation.file") .version("0.8.1") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index e76351f6c025..c40abade430f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -316,6 +316,14 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val AUTO_STOP_ACTIVE_SPARK_CONTEXTS = + ConfigBuilder("spark.kubernetes.submit.autoStopActiveSparkContexts") + .version("3.5.0") + .doc("When set to true, on Kubernetes Spark will stop all the active Spark contexts after " + + "the finish of non-shell applications' main method.") + .booleanConf + .createWithDefault(false) + val KUBERNETES_SCHEDULER_NAME = ConfigBuilder("spark.kubernetes.scheduler.name") .doc("Specify the scheduler name for driver and executor pods. If " + From 72ee3cbb319382299b07f623a99ca1b5d91c9f47 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Mon, 30 Jan 2023 10:33:55 -0800 Subject: [PATCH 4/5] reword the config's doc --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c40abade430f..20e5a392630c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -319,8 +319,8 @@ private[spark] object Config extends Logging { val AUTO_STOP_ACTIVE_SPARK_CONTEXTS = ConfigBuilder("spark.kubernetes.submit.autoStopActiveSparkContexts") .version("3.5.0") - .doc("When set to true, on Kubernetes Spark will stop all the active Spark contexts after " + - "the finish of non-shell applications' main method.") + .doc("When set to true, Spark on Kubernetes will stop all active Spark contexts once " + + "non-shell application' main methods are finished.") .booleanConf .createWithDefault(false) From ff891aaac78450b8a7c2e38daf5009dc2582aa39 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Thu, 11 May 2023 15:12:10 -0700 Subject: [PATCH 5/5] Change the default of autoStopActiveSparkContexts to true --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3b58674fb4ee..5bdf80fab9bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1014,7 +1014,7 @@ private[spark] class SparkSubmit extends Logging { if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass) && - sparkConf.getBoolean("spark.kubernetes.submit.autoStopActiveSparkContexts", false)) { + sparkConf.getBoolean("spark.kubernetes.submit.autoStopActiveSparkContexts", true)) { try { SparkContext.getActive.foreach(_.stop()) } catch { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index d3a0c30ab8ab..ad1c8f95ef76 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -323,7 +323,7 @@ private[spark] object Config extends Logging { .doc("When set to true, Spark on Kubernetes will stop all active Spark contexts once " + "non-shell application' main methods are finished.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val KUBERNETES_SCHEDULER_NAME = ConfigBuilder("spark.kubernetes.scheduler.name")