diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 6a26df2997fd2..2c82c378ac935 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -41,7 +41,10 @@ private[spark] class BasicExecutorFeatureStep( .getOrElse(throw new SparkException("Must specify the executor container image")) private val blockManagerPort = kubernetesConf .sparkConf - .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + .getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) + + require(blockManagerPort == 0 || (1024 <= blockManagerPort && blockManagerPort < 65536), + "port number must be 0 or in [1024, 65535]") private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix @@ -151,14 +154,17 @@ private[spark] class BasicExecutorFeatureStep( } } - val requiredPorts = Seq( - (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) - .map { case (name, port) => - new ContainerPortBuilder() - .withName(name) - .withContainerPort(port) - .build() - } + // 0 is invalid as kubernetes containerPort request, we shall leave it unmounted + val requiredPorts = if (blockManagerPort != 0) { + Seq( + (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) + .map { case (name, port) => + new ContainerPortBuilder() + .withName(name) + .withContainerPort(port) + .build() + } + } else Nil val executorContainer = new ContainerBuilder(pod.container) .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index da50372d04c73..92d46426ac5f7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -239,6 +239,36 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { executor.container, SecurityManager.ENV_AUTH_SECRET)) } + test("SPARK-35482: use correct block manager port for executor pods") { + try { + val initPod = SparkPod.initialPod() + val sm = new SecurityManager(baseConf) + val step1 = + new BasicExecutorFeatureStep(newExecutorConf(), sm) + val containerPort1 = step1.configurePod(initPod).container.getPorts.get(0) + assert(containerPort1.getContainerPort === DEFAULT_BLOCKMANAGER_PORT, + s"should use port no. $DEFAULT_BLOCKMANAGER_PORT as default") + + baseConf.set(BLOCK_MANAGER_PORT, 12345) + val step2 = new BasicExecutorFeatureStep(newExecutorConf(), sm) + val containerPort2 = step2.configurePod(initPod).container.getPorts.get(0) + assert(containerPort2.getContainerPort === 12345) + + baseConf.set(BLOCK_MANAGER_PORT, 1000) + val e = intercept[IllegalArgumentException] { + new BasicExecutorFeatureStep(newExecutorConf(), sm) + } + assert(e.getMessage.contains("port number must be 0 or in [1024, 65535]")) + + baseConf.set(BLOCK_MANAGER_PORT, 0) + val step3 = new BasicExecutorFeatureStep(newExecutorConf(), sm) + assert(step3.configurePod(initPod).container.getPorts.isEmpty, "random port") + } finally { + baseConf.remove(BLOCK_MANAGER_PORT) + } + } + + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1)