Skip to content

Commit 66d1e62

Browse files
Adelbert ChangAdelbert Chang
authored andcommitted
[SPARK-24960][K8S] explicitly expose ports on driver container
1 parent 58353d7 commit 66d1e62

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[spark] object Constants {
4747
val DEFAULT_BLOCKMANAGER_PORT = 7079
4848
val DRIVER_PORT_NAME = "driver-rpc-port"
4949
val BLOCK_MANAGER_PORT_NAME = "blockmanager"
50+
val UI_PORT_NAME = "spark-ui"
5051

5152
// Environment Variables
5253
val ENV_DRIVER_URL = "SPARK_DRIVER_URL"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants._
2828
import org.apache.spark.deploy.k8s.submit._
2929
import org.apache.spark.internal.config._
30+
import org.apache.spark.ui.SparkUI
3031

3132
private[spark] class BasicDriverFeatureStep(
3233
conf: KubernetesConf[KubernetesDriverSpecificConf])
@@ -72,10 +73,31 @@ private[spark] class BasicDriverFeatureStep(
7273
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
7374
}
7475

76+
val driverPort = conf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
77+
val driverBlockManagerPort = conf.sparkConf.getInt(
78+
DRIVER_BLOCK_MANAGER_PORT.key,
79+
DEFAULT_BLOCKMANAGER_PORT
80+
)
81+
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
7582
val driverContainer = new ContainerBuilder(pod.container)
7683
.withName(DRIVER_CONTAINER_NAME)
7784
.withImage(driverContainerImage)
7885
.withImagePullPolicy(conf.imagePullPolicy())
86+
.addNewPort()
87+
.withName(DRIVER_PORT_NAME)
88+
.withContainerPort(driverPort)
89+
.withProtocol("TCP")
90+
.endPort()
91+
.addNewPort()
92+
.withName(BLOCK_MANAGER_PORT_NAME)
93+
.withContainerPort(driverBlockManagerPort)
94+
.withProtocol("TCP")
95+
.endPort()
96+
.addNewPort()
97+
.withName(UI_PORT_NAME)
98+
.withContainerPort(driverUIPort)
99+
.withProtocol("TCP")
100+
.endPort()
79101
.addAllToEnv(driverCustomEnvs.asJava)
80102
.addNewEnv()
81103
.withName(ENV_DRIVER_BIND_ADDRESS)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ package org.apache.spark.deploy.k8s.features
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder
21+
import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder}
2222

2323
import org.apache.spark.{SparkConf, SparkFunSuite}
2424
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
2525
import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
2828
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
29+
import org.apache.spark.ui.SparkUI
2930

3031
class BasicDriverFeatureStepSuite extends SparkFunSuite {
3132

@@ -87,6 +88,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
8788
assert(configuredPod.container.getImage === "spark-driver:latest")
8889
assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
8990

91+
val expectedPortNames = Set(
92+
containerPort(DRIVER_PORT_NAME, DEFAULT_DRIVER_PORT),
93+
containerPort(BLOCK_MANAGER_PORT_NAME, DEFAULT_BLOCKMANAGER_PORT),
94+
containerPort(UI_PORT_NAME, SparkUI.DEFAULT_PORT)
95+
)
96+
val foundPortNames = configuredPod.container.getPorts.asScala.toSet
97+
assert(expectedPortNames === foundPortNames)
98+
9099
assert(configuredPod.container.getEnv.size === 3)
91100
val envs = configuredPod.container
92101
.getEnv
@@ -203,4 +212,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
203212
"spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt")
204213
assert(additionalProperties === expectedSparkConf)
205214
}
215+
216+
def containerPort(name: String, portNumber: Int): ContainerPort =
217+
new ContainerPortBuilder()
218+
.withName(name)
219+
.withContainerPort(portNumber)
220+
.withProtocol("TCP")
221+
.build()
206222
}

0 commit comments

Comments
 (0)