Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceProfileManager
Expand Down Expand Up @@ -128,7 +128,7 @@ private[spark] class ExecutorAllocationManager(
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)

private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED)
private val decommissionEnabled = conf.get(DECOMMISSION_ENABLED)

private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[deploy] class Worker(
assert (port > 0)

// If worker decommissioning is enabled register a handler on PWR to shutdown.
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
if (conf.get(config.DECOMMISSION_ENABLED)) {
logInfo("Registering SIGPWR handler to trigger decommissioning.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling worker decommission feature.")(decommissionSelf)
Expand Down Expand Up @@ -769,7 +769,7 @@ private[deploy] class Worker(
}

private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
if (conf.get(config.DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
decommissioned = true
sendToMaster(WorkerDecommission(workerId, self))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,4 @@ private[spark] object Worker {
.version("2.0.2")
.intConf
.createWithDefault(100)

private[spark] val WORKER_DECOMMISSION_ENABLED =
ConfigBuilder("spark.worker.decommission.enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
}
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,19 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val DECOMMISSION_ENABLED =
ConfigBuilder("spark.decommission.enabled")
.doc("When decommission enabled, Spark will try its best to shutdown the executor " +
s"gracefully. Spark will try to migrate all the RDD blocks (controlled by " +
s"${STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key}) and shuffle blocks (controlled by " +
s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key}) from the decommissioning " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: some 2 spaces here ... before from

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only one space?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Sorry. Fonts are misleading :-)

s"executor to a remote executor when ${STORAGE_DECOMMISSION_ENABLED.key} is enabled. " +
s"With decommission enabled, Spark will also decommission an executor instead of " +
s"killing when ${DYN_ALLOCATION_ENABLED.key} enabled.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
.doc("Duration after which a decommissioned executor will be killed forcefully." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
Expand Down Expand Up @@ -1668,7 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
.set(TEST_SCHEDULE_INTERVAL, 30000L)
.set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled)
.set(DECOMMISSION_ENABLED, decommissioningEnabled)
sparkConf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DecommissionWorkerSuite
override def beforeEach(): Unit = {
super.beforeEach()
masterAndWorkerConf = new SparkConf()
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.DECOMMISSION_ENABLED, true)
masterAndWorkerSecurityManager = new SecurityManager(masterAndWorkerConf)
masterRpcEnv = RpcEnv.create(
Master.SYSTEM_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AppClientSuite
*/
override def beforeAll(): Unit = {
super.beforeAll()
conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
conf = new SparkConf().set(config.DECOMMISSION_ENABLED.key, "true")
securityManager = new SecurityManager(conf)
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
workerRpcEnvs = (0 until numWorkers).map { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED}
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.internal.config.{DECOMMISSION_ENABLED, DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED}
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend

Expand All @@ -37,7 +36,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)
.set(WORKER_DECOMMISSION_ENABLED, true)
.set(DECOMMISSION_ENABLED, true)

test("Worker decommission and executor idle timeout") {
sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {

override def beforeEach(): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local")
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.DECOMMISSION_ENABLED, true)

sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val migrateDuring = whenToDecom != JobEnded
val master = s"local-cluster[${numExecs}, 1, 1024]"
val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
}

def workerDecommissioning: Boolean =
sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED)
sparkConf.get(org.apache.spark.internal.config.DECOMMISSION_ENABLED)

def nodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.util.Random

import org.apache.spark.{ExecutorAllocationClient, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Streaming._
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.streaming.util.RecurringTimer
Expand Down Expand Up @@ -135,7 +135,7 @@ private[streaming] class ExecutorAllocationManager(
logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
if (removableExecIds.nonEmpty) {
val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
if (conf.get(DECOMMISSION_ENABLED)) {
client.decommissionExecutor(execIdToRemove,
ExecutorDecommissionInfo("spark scale down", false),
adjustTargetNumExecutors = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import org.scalatest.time.SpanSugar._
import org.scalatestplus.mockito.MockitoSugar

import org.apache.spark.{ExecutorAllocationClient, SparkConf}
import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING}
import org.apache.spark.internal.config.{DECOMMISSION_ENABLED, DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING}
import org.apache.spark.internal.config.Streaming._
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase}
Expand Down Expand Up @@ -56,7 +55,7 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
def basicTest(decommissioning: Boolean): Unit = {
// Test that adding batch processing time info to allocation manager
// causes executors to be requested and killed accordingly
conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning)
conf.set(DECOMMISSION_ENABLED, decommissioning)

// There is 1 receiver, and exec 1 has been allocated to it
withAllocationManager(numReceivers = 1, conf = conf) {
Expand Down