From 0a791728f462aa7a3d4d6a661c427495fc84011b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 14 Oct 2025 13:53:55 -0700 Subject: [PATCH 1/2] [SPARK-53907][K8S] Support `spark.kubernetes.allocation.maximum` --- .../org/apache/spark/deploy/k8s/Config.scala | 8 ++++++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 7 +++++++ .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 15 +++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index a56043b4912f2..8be78240e34f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -479,6 +479,14 @@ private[spark] object Config extends Logging { .checkValue(value => value > 100, "Allocation batch delay must be greater than 0.1s.") .createWithDefaultString("1s") + val KUBERNETES_ALLOCATION_MAXIMUM = + ConfigBuilder("spark.kubernetes.allocation.maximum") + .doc("The maximum number of executor pods to try to create during the whole job lifecycle.") + .version("4.1.0") + .intConf + .checkValue(value => value > 0, "Allocation maximum should be a positive integer") + .createWithDefault(Int.MaxValue) + val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT = ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout") .doc("Time to wait for driver pod to get ready before creating executor pods. This wait " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 9a385dca0f42d..0a2b3db0b77b0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -48,6 +48,8 @@ class ExecutorPodsAllocator( protected val EXECUTOR_ID_COUNTER = new AtomicInteger(0) + protected val MAXIMUM = conf.get(KUBERNETES_ALLOCATION_MAXIMUM) + protected val PVC_COUNTER = new AtomicInteger(0) protected val maxPVCs = if (Utils.isDynamicAllocationEnabled(conf)) { @@ -69,6 +71,8 @@ class ExecutorPodsAllocator( protected val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + protected val podAllocationMaximum = conf.get(KUBERNETES_ALLOCATION_MAXIMUM) + protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS) protected val podCreationTimeout = math.max( @@ -426,6 +430,9 @@ class ExecutorPodsAllocator( return } val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() + if (newExecutorId >= MAXIMUM) { + throw new SparkException(s"Exceed the pod creation limit: $MAXIMUM") + } val executorConf = KubernetesConf.createExecutorConf( conf, newExecutorId.toString, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a95c93724fe47..755883efce118 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -249,6 +249,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .resource(podWithAttachedContainerForId(podAllocationSize + 1)) } + test("SPARK-53907: Support spark.kubernetes.allocation.maximum") { + val confWithAllocationMaximum = conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1") + podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithAllocationMaximum, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + + val m = intercept[SparkException] { + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) + }.getMessage + assert(m.contains("Exceed the pod creation limit: 1")) + } + test("Request executors in batches. Allow another batch to be requested if" + " all pending executors start running.") { val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() From f9161d1c91df50377759b55f49b2e42c49549da6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 14 Oct 2025 14:48:12 -0700 Subject: [PATCH 2/2] Remove redundant variable --- .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 0a2b3db0b77b0..48e48cc3fbc70 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -48,8 +48,6 @@ class ExecutorPodsAllocator( protected val EXECUTOR_ID_COUNTER = new AtomicInteger(0) - protected val MAXIMUM = conf.get(KUBERNETES_ALLOCATION_MAXIMUM) - protected val PVC_COUNTER = new AtomicInteger(0) protected val maxPVCs = if (Utils.isDynamicAllocationEnabled(conf)) { @@ -430,8 +428,8 @@ class ExecutorPodsAllocator( return } val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() - if (newExecutorId >= MAXIMUM) { - throw new SparkException(s"Exceed the pod creation limit: $MAXIMUM") + if (newExecutorId >= podAllocationMaximum) { + throw new SparkException(s"Exceed the pod creation limit: $podAllocationMaximum") } val executorConf = KubernetesConf.createExecutorConf( conf,