diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 274b859fef96..d6dc56f9d9d1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -439,6 +439,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly" val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path" val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName" + val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = "options.storageClass" val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium" val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit" val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala new file mode 100644 index 000000000000..4db9211c1e42 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.HasMetadata + +private[spark] case class KubernetesExecutorSpec( + pod: SparkPod, + executorKubernetesResources: Seq[HasMetadata]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index c49f4a15de97..e8bf8f9c9b50 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.k8s import java.io.{File, IOException} import java.net.URI import java.security.SecureRandom -import java.util.UUID +import java.util.{Collections, UUID} import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} @@ -323,4 +323,22 @@ private[spark] object KubernetesUtils extends Logging { .build() } } + + // Add a OwnerReference to the given resources making the pod an owner of them so when + // the pod is deleted, the resources are garbage collected. + def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = { + if (pod != null) { + val reference = new OwnerReferenceBuilder() + .withName(pod.getMetadata.getName) + .withApiVersion(pod.getApiVersion) + .withUid(pod.getMetadata.getUid) + .withKind(pod.getKind) + .withController(true) + .build() + resources.foreach { resource => + val originalMetadata = resource.getMetadata + originalMetadata.setOwnerReferences(Collections.singletonList(reference)) + } + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index f9faa435c81c..3f7355de1891 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -21,7 +21,10 @@ private[spark] sealed trait KubernetesVolumeSpecificConf private[spark] case class KubernetesHostPathVolumeConf(hostPath: String) extends KubernetesVolumeSpecificConf -private[spark] case class KubernetesPVCVolumeConf(claimName: String) +private[spark] case class KubernetesPVCVolumeConf( + claimName: String, + storageClass: Option[String] = None, + size: Option[String] = None) extends KubernetesVolumeSpecificConf private[spark] case class KubernetesEmptyDirVolumeConf( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 7f821d37ac81..77921f6338c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -71,7 +71,13 @@ private[spark] object KubernetesVolumeUtils { case KUBERNETES_VOLUMES_PVC_TYPE => val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY" - KubernetesPVCVolumeConf(options(claimNameKey)) + val storageClassKey = + s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY" + val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" + KubernetesPVCVolumeConf( + options(claimNameKey), + options.get(storageClassKey), + options.get(sizeLimitKey)) case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 94b5c37f96e3..fe4717d09951 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -16,6 +16,9 @@ */ package org.apache.spark.deploy.k8s.features +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ @@ -23,6 +26,9 @@ import org.apache.spark.deploy.k8s.Constants.ENV_EXECUTOR_ID private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { + import MountVolumesFeatureStep._ + + val additionalResources = ArrayBuffer.empty[HasMetadata] override def configurePod(pod: SparkPod): SparkPod = { val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip @@ -43,7 +49,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) private def constructVolumes( volumeSpecs: Iterable[KubernetesVolumeSpec] ): Iterable[(VolumeMount, Volume)] = { - volumeSpecs.map { spec => + volumeSpecs.zipWithIndex.map { case (spec, i) => val volumeMount = new VolumeMountBuilder() .withMountPath(spec.mountPath) .withReadOnly(spec.mountReadOnly) @@ -57,10 +63,32 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath, "")) - case KubernetesPVCVolumeConf(claimNameTemplate) => + case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) => val claimName = conf match { case c: KubernetesExecutorConf => - claimNameTemplate.replaceAll(ENV_EXECUTOR_ID, c.executorId) + val claimName = claimNameTemplate + .replaceAll(PVC_ON_DEMAND, + s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i") + .replaceAll(ENV_EXECUTOR_ID, c.executorId) + + if (storageClass.isDefined && size.isDefined) { + additionalResources.append(new PersistentVolumeClaimBuilder() + .withKind(PVC) + .withApiVersion("v1") + .withNewMetadata() + .withName(claimName) + .endMetadata() + .withNewSpec() + .withStorageClassName(storageClass.get) + .withAccessModes(PVC_ACCESS_MODE) + .withResources(new ResourceRequirementsBuilder() + .withRequests(Map("storage" -> new Quantity(size.get)).asJava).build()) + .endSpec() + .build()) + } + + claimName + case _ => claimNameTemplate } @@ -84,4 +112,15 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) (volumeMount, volume) } } + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + additionalResources + } +} + +private[spark] object MountVolumesFeatureStep { + val PVC_ON_DEMAND = "OnDemand" + val PVC = "PersistentVolumeClaim" + val PVC_POSTFIX = "-pvc" + val PVC_ACCESS_MODE = "ReadWriteOnce" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 713d35dcf64f..93caa70e085c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -20,18 +20,20 @@ import java.io.StringWriter import java.util.{Collections, UUID} import java.util.Properties +import scala.collection.mutable +import scala.util.control.Breaks._ +import scala.util.control.NonFatal + import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.Watcher.Action -import scala.collection.mutable -import scala.util.control.NonFatal -import util.control.Breaks._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -134,7 +136,7 @@ private[spark] class Client( val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) try { val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + addOwnerReference(createdDriverPod, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) => @@ -163,22 +165,6 @@ private[spark] class Client( } } - // Add a OwnerReference to the given resources making the driver pod an owner of them so when - // the driver pod is deleted, the resources are garbage collected. - private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = { - val driverPodOwnerReference = new OwnerReferenceBuilder() - .withName(driverPod.getMetadata.getName) - .withApiVersion(driverPod.getApiVersion) - .withUid(driverPod.getMetadata.getUid) - .withKind(driverPod.getKind) - .withController(true) - .build() - resources.foreach { resource => - val originalMetadata = resource.getMetadata - originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) - } - } - // Build a Config Map that will house spark conf properties in a single file for spark-submit private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = { val properties = new Properties() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index b6ea1faeda97..2bf8685038cf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,14 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong} -import io.fabric8.kubernetes.api.model.PodBuilder -import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.Logging import org.apache.spark.util.{Clock, Utils} @@ -212,16 +215,33 @@ private[spark] class ExecutorPodsAllocator( newExecutorId.toString, applicationId, driverPod) - val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, + val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient) + val executorPod = resolvedExecutorSpec.pod val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) .endSpec() .build() - kubernetesClient.pods().create(podWithAttachedContainer) - newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() - logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) + try { + val resources = resolvedExecutorSpec.executorKubernetesResources + addOwnerReference(createdExecutorPod, resources) + resources + .filter(_.getKind == "PersistentVolumeClaim") + .foreach { resource => + val pvc = resource.asInstanceOf[PersistentVolumeClaim] + logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + + s"StorageClass ${pvc.getSpec.getStorageClassName}") + kubernetesClient.persistentVolumeClaims().create(pvc) + } + newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() + logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdExecutorPod) + throw e + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 22bff2c80733..b5f21fe69f52 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -29,7 +29,7 @@ private[spark] class KubernetesExecutorBuilder { def buildFromFeatures( conf: KubernetesExecutorConf, secMgr: SecurityManager, - client: KubernetesClient): SparkPod = { + client: KubernetesClient): KubernetesExecutorSpec = { val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) .map { file => KubernetesUtils.loadPodFromTemplate( @@ -47,7 +47,17 @@ private[spark] class KubernetesExecutorBuilder { new MountVolumesFeatureStep(conf), new LocalDirsFeatureStep(conf)) - features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } + val spec = KubernetesExecutorSpec( + initialPod, + executorKubernetesResources = Seq.empty) + + features.foldLeft(spec) { case (spec, feature) => + val configuredPod = feature.configurePod(spec.pod) + val addedResources = feature.getAdditionalKubernetesResources() + KubernetesExecutorSpec( + configuredPod, + spec.executorKubernetesResources ++ addedResources) + } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala index d6871a6c2866..83d9481e6f2b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -114,9 +114,12 @@ object KubernetesTestConf { (KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path)) - case KubernetesPVCVolumeConf(claimName) => + case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) => + val sconf = storageClass + .map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap + val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap (KUBERNETES_VOLUMES_PVC_TYPE, - Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName)) + Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf) case KubernetesEmptyDirVolumeConf(medium, sizeLimit) => val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index a9a1ec46a6e6..df7616271681 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -89,6 +89,23 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}") } + test("Create and mount persistentVolumeClaims in executors") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + true, + KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND) + ) + val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) + } + test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume", diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a0abded3823b..e4b36e46594f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -27,7 +27,7 @@ import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ @@ -202,9 +202,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!podsAllocatorUnderTest.isDeleted("4")) } - private def executorPodAnswer(): Answer[SparkPod] = + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) - executorPodWithId(k8sConf.executorId.toInt) + KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt), Seq.empty) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index bd716174a827..796e2126e9e3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -32,7 +32,7 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite { sparkConf.set("spark.driver.host", "https://driver.host.com") val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) val secMgr = new SecurityManager(sparkConf) - new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client) + new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client).pod } }