diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index fcbcd797a8066..bd7e7174cff1d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -226,6 +226,10 @@ private[spark] class BasicExecutorFeatureStep( val containerWithLimitCores = if (isDefaultProfile) { executorLimitCores.map { limitCores => val executorCpuLimitQuantity = new Quantity(limitCores) + if (executorCpuLimitQuantity.compareTo(executorCpuQuantity) < 0) { + throw new SparkException(s"The executor cpu request ($executorCpuQuantity) should be " + + s"less than or equal to cpu limit ($executorCpuLimitQuantity)") + } new ContainerBuilder(executorContainerWithConfVolume) .editResources() .addToLimits("cpu", executorCpuLimitQuantity) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 37d28c203000e..906727e49276e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -120,6 +120,18 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(error.contains("You must specify an amount for gpu")) } + test("SPARK-52933: Verify if the executor cpu request exceeds limit") { + baseConf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "2") + baseConf.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "1") + val error = intercept[SparkException] { + initDefaultProfile(baseConf) + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), + defaultProfile) + val executor = step.configurePod(SparkPod.initialPod()) + }.getMessage() + assert(error.contains("cpu request (2) should be less than or equal to cpu limit (1)")) + } + test("basic executor pod with resources") { val fpgaResourceID = new ResourceID(SPARK_EXECUTOR_PREFIX, FPGA) val gpuExecutorResourceID = new ResourceID(SPARK_EXECUTOR_PREFIX, GPU)