diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index a56043b4912f2..8be78240e34f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -479,6 +479,14 @@ private[spark] object Config extends Logging { .checkValue(value => value > 100, "Allocation batch delay must be greater than 0.1s.") .createWithDefaultString("1s") + val KUBERNETES_ALLOCATION_MAXIMUM = + ConfigBuilder("spark.kubernetes.allocation.maximum") + .doc("The maximum number of executor pods to try to create during the whole job lifecycle.") + .version("4.1.0") + .intConf + .checkValue(value => value > 0, "Allocation maximum should be a positive integer") + .createWithDefault(Int.MaxValue) + val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT = ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout") .doc("Time to wait for driver pod to get ready before creating executor pods. This wait " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 9a385dca0f42d..48e48cc3fbc70 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -69,6 +69,8 @@ class ExecutorPodsAllocator( protected val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + protected val podAllocationMaximum = conf.get(KUBERNETES_ALLOCATION_MAXIMUM) + protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS) protected val podCreationTimeout = math.max( @@ -426,6 +428,9 @@ class ExecutorPodsAllocator( return } val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() + if (newExecutorId >= podAllocationMaximum) { + throw new SparkException(s"Exceed the pod creation limit: $podAllocationMaximum") + } val executorConf = KubernetesConf.createExecutorConf( conf, newExecutorId.toString, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a95c93724fe47..755883efce118 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -249,6 +249,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .resource(podWithAttachedContainerForId(podAllocationSize + 1)) } + test("SPARK-53907: Support spark.kubernetes.allocation.maximum") { + val confWithAllocationMaximum = conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1") + podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithAllocationMaximum, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + + val m = intercept[SparkException] { + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) + }.getMessage + assert(m.contains("Exceed the pod creation limit: 1")) + } + test("Request executors in batches. Allow another batch to be requested if" + " all pending executors start running.") { val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()