diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1cb840f2fe80..2d8beefdc799 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -27,8 +27,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL -import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfileManager @@ -128,7 +128,7 @@ private[spark] class ExecutorAllocationManager( private val executorAllocationRatio = conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) - private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED) + private val decommissionEnabled = conf.get(DECOMMISSION_ENABLED) private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 862e685c2dce..7649bc37c30b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -67,7 +67,7 @@ private[deploy] class Worker( assert (port > 0) // If worker decommissioning is enabled register a handler on PWR to shutdown. - if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + if (conf.get(config.DECOMMISSION_ENABLED)) { logInfo("Registering SIGPWR handler to trigger decommissioning.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + "disabling worker decommission feature.")(decommissionSelf) @@ -769,7 +769,7 @@ private[deploy] class Worker( } private[deploy] def decommissionSelf(): Boolean = { - if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + if (conf.get(config.DECOMMISSION_ENABLED)) { logDebug("Decommissioning self") decommissioned = true sendToMaster(WorkerDecommission(workerId, self)) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala index 52fcafad8b49..a8072712c46c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -82,10 +82,4 @@ private[spark] object Worker { .version("2.0.2") .intConf .createWithDefault(100) - - private[spark] val WORKER_DECOMMISSION_ENABLED = - ConfigBuilder("spark.worker.decommission.enabled") - .version("3.1.0") - .booleanConf - .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 34acf9f9b30c..b308115935d6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1866,6 +1866,19 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val DECOMMISSION_ENABLED = + ConfigBuilder("spark.decommission.enabled") + .doc("When decommission enabled, Spark will try its best to shutdown the executor " + + s"gracefully. Spark will try to migrate all the RDD blocks (controlled by " + + s"${STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key}) and shuffle blocks (controlled by " + + s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key}) from the decommissioning " + + s"executor to a remote executor when ${STORAGE_DECOMMISSION_ENABLED.key} is enabled. " + + s"With decommission enabled, Spark will also decommission an executor instead of " + + s"killing when ${DYN_ALLOCATION_ENABLED.key} enabled.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL = ConfigBuilder("spark.executor.decommission.killInterval") .doc("Duration after which a decommissioned executor will be killed forcefully." + diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3abe051e4708..3f8cbf59bf52 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -27,8 +27,8 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL -import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -1668,7 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 30000L) - .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled) + .set(DECOMMISSION_ENABLED, decommissioningEnabled) sparkConf } diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index 90b77a21ad02..9c5e46085405 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -57,7 +57,7 @@ class DecommissionWorkerSuite override def beforeEach(): Unit = { super.beforeEach() masterAndWorkerConf = new SparkConf() - .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.DECOMMISSION_ENABLED, true) masterAndWorkerSecurityManager = new SecurityManager(masterAndWorkerConf) masterRpcEnv = RpcEnv.create( Master.SYSTEM_NAME, diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index e091bd05c2dc..85ad4bdb3ec1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -59,7 +59,7 @@ class AppClientSuite */ override def beforeAll(): Unit = { super.beforeAll() - conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + conf = new SparkConf().set(config.DECOMMISSION_ENABLED.key, "true") securityManager = new SecurityManager(conf) masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) workerRpcEnvs = (0 until numWorkers).map { i => diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 6bfd3f72e632..4264d45b36f2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -23,8 +23,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.LocalSparkContext.withSpark -import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED} -import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED +import org.apache.spark.internal.config.{DECOMMISSION_ENABLED, DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED} import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER} import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend @@ -37,7 +36,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte .set(DYN_ALLOCATION_ENABLED, true) .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5) - .set(WORKER_DECOMMISSION_ENABLED, true) + .set(DECOMMISSION_ENABLED, true) test("Worker decommission and executor idle timeout") { sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index ea5be21d16d8..1ccb53f32dc2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -32,7 +32,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") - .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.DECOMMISSION_ENABLED, true) sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 82f87a5b58b4..37836a9b4904 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -65,7 +65,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val migrateDuring = whenToDecom != JobEnded val master = s"local-cluster[${numExecs}, 1, 1024]" val conf = new SparkConf().setAppName("test").setMaster(master) - .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 6bd7fa81c0e3..f3e492e8efad 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -56,7 +56,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { } def workerDecommissioning: Boolean = - sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED) + sparkConf.get(org.apache.spark.internal.config.DECOMMISSION_ENABLED) def nodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index a4b7b7a28fd4..8f74d2d9959d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -22,8 +22,8 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.Streaming._ -import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.util.RecurringTimer @@ -135,7 +135,7 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + if (conf.get(DECOMMISSION_ENABLED)) { client.decommissionExecutor(execIdToRemove, ExecutorDecommissionInfo("spark scale down", false), adjustTargetNumExecutors = true) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 9e06625371ae..ec3ff456b8ea 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -25,9 +25,8 @@ import org.scalatest.time.SpanSugar._ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{ExecutorAllocationClient, SparkConf} -import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} +import org.apache.spark.internal.config.{DECOMMISSION_ENABLED, DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ -import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} @@ -56,7 +55,7 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly - conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning) + conf.set(DECOMMISSION_ENABLED, decommissioning) // There is 1 receiver, and exec 1 has been allocated to it withAllocationManager(numReceivers = 1, conf = conf) {