diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala index 4db9211c1e42..d7d9dc87a210 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExecutorSpec.scala @@ -20,4 +20,5 @@ import io.fabric8.kubernetes.api.model.HasMetadata private[spark] case class KubernetesExecutorSpec( pod: SparkPod, + executorPreKubernetesResources: Seq[HasMetadata], executorKubernetesResources: Seq[HasMetadata]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index bbabd680c1a4..d264ad9080df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -23,7 +23,7 @@ import java.util.{Collections, UUID} import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, PersistentVolumeClaim, Pod, PodBuilder, Quantity} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} @@ -414,4 +414,52 @@ object KubernetesUtils extends Logging { .build() } } + + /** + * Create pre-resource in need before pod creation + */ + @Since("3.5.0") + def createPreResource( + client: KubernetesClient, + resource: HasMetadata, + namespace: String): Unit = { + resource match { + case pvc: PersistentVolumeClaim => + client.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() + case other => + client.resourceList(Seq(other): _*).createOrReplace() + } + } + + /** + * Refresh OwnerReference in the given resource + * making the driver or executor pod an owner of them + */ + @Since("3.5.0") + def refreshOwnerReferenceInResource( + client: KubernetesClient, + resource: HasMetadata, + namespace: String, + pod: Pod): Unit = { + resource match { + case pvc: PersistentVolumeClaim => + val createdPVC = + client + .persistentVolumeClaims() + .inNamespace(namespace) + .withName(pvc.getMetadata.getName) + .get() + addOwnerReference(pod, Seq(createdPVC)) + logDebug(s"Trying to refresh PersistentVolumeClaim ${createdPVC.getMetadata.getName}" + + s"with OwnerReference ${createdPVC.getMetadata.getOwnerReferences}") + client + .persistentVolumeClaims() + .inNamespace(namespace) + .withName(createdPVC.getMetadata.getName) + .patch(createdPVC) + case other => + addOwnerReference(pod, Seq(other)) + client.resourceList(Seq(other): _*).createOrReplace() + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index d47024ca9fe0..709428b3d0c6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -29,7 +29,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { import MountVolumesFeatureStep._ - val additionalResources = ArrayBuffer.empty[HasMetadata] + val additionalPreResources = ArrayBuffer.empty[HasMetadata] override def configurePod(pod: SparkPod): SparkPod = { val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip @@ -82,7 +82,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) .replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i") } if (storageClass.isDefined && size.isDefined) { - additionalResources.append(new PersistentVolumeClaimBuilder() + additionalPreResources.append(new PersistentVolumeClaimBuilder() .withKind(PVC) .withApiVersion("v1") .withNewMetadata() @@ -119,8 +119,8 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) } } - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { - additionalResources.toSeq + override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = { + additionalPreResources.toSeq } private def checkPVCClaimName(claimName: String): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 14d3c4d1f42f..ba600587fe34 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkApplication import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference +import org.apache.spark.deploy.k8s.KubernetesUtils.{addOwnerReference, createPreResource, refreshOwnerReferenceInResource} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -136,14 +136,16 @@ private[spark] class Client( // setup resources before pod creation val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources - try { - kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - logError("Please check \"kubectl auth can-i create [resource]\" first." + - " It should be yes. And please also check your feature step implementation.") - kubernetesClient.resourceList(preKubernetesResources: _*).delete() - throw e + preKubernetesResources.foreach { resource => + try { + createPreResource(kubernetesClient, resource, conf.namespace) + } catch { + case NonFatal(e) => + logError("Please check \"kubectl auth can-i create [resource]\" first." + + " It should be yes. And please also check feature step implementation.") + kubernetesClient.resourceList(Seq(resource): _*).delete() + throw e + } } var watch: Watch = null @@ -159,14 +161,16 @@ private[spark] class Client( } // Refresh all pre-resources' owner references - try { - addOwnerReference(createdDriverPod, preKubernetesResources) - kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().resource(createdDriverPod).delete() - kubernetesClient.resourceList(preKubernetesResources: _*).delete() - throw e + preKubernetesResources.foreach { resource => + try { + refreshOwnerReferenceInResource(kubernetesClient, resource, conf.namespace, + createdDriverPod) + } catch { + case NonFatal(e) => + kubernetesClient.pods().resource(createdDriverPod).delete() + kubernetesClient.resourceList(Seq(resource): _*).delete() + throw e + } } // setup resources after pod creation, and refresh all resources' owner references diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 7c92e0ff444d..d8067cd2086b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -31,7 +31,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf -import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference +import org.apache.spark.deploy.k8s.KubernetesUtils.{createPreResource, refreshOwnerReferenceInResource} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.resource.ResourceProfile @@ -434,34 +434,55 @@ class ExecutorPodsAllocator( .addToContainers(executorPod.container) .endSpec() .build() - val resources = replacePVCsIfNeeded( - podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) - val createdExecutorPod = - kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() - try { - addOwnerReference(createdExecutorPod, resources) - resources - .filter(_.getKind == "PersistentVolumeClaim") - .foreach { resource => - if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { - addOwnerReference(driverPod.get, Seq(resource)) - } - val pvc = resource.asInstanceOf[PersistentVolumeClaim] - logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + - s"StorageClass ${pvc.getSpec.getStorageClassName}") - kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() + val preResources = replacePVCsIfNeeded( + podWithAttachedContainer, resolvedExecutorSpec.executorPreKubernetesResources, reusablePVCs) + + preResources.foreach { resource => + try { + createPreResource(kubernetesClient, resource, namespace) + if (resource.isInstanceOf[PersistentVolumeClaim]) { PVC_COUNTER.incrementAndGet() } + } catch { + case NonFatal(e) => + logError("Please check \"kubectl auth can-i create [resource]\" first." + + " It should be yes. And please also check feature step implementation.") + kubernetesClient.resourceList(Seq(resource): _*).delete() + throw e + } + } + + var createdExecutorPod: Pod = null + try { + createdExecutorPod = + kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } catch { case NonFatal(e) => - kubernetesClient.pods() - .inNamespace(namespace) - .resource(createdExecutorPod) - .delete() + kubernetesClient.resourceList(preResources: _*).delete() + logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.") throw e } + + // Refresh all pre-resources' owner references + preResources.foreach { resource => + try { + if (resource.isInstanceOf[PersistentVolumeClaim] && + conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { + refreshOwnerReferenceInResource(kubernetesClient, resource, namespace, + driverPod.get) + } else { + refreshOwnerReferenceInResource(kubernetesClient, resource, namespace, + createdExecutorPod) + } + } catch { + case NonFatal(e) => + kubernetesClient.pods().inNamespace(namespace).resource(createdExecutorPod).delete() + kubernetesClient.resourceList(Seq(resource): _*).delete() + throw e + } + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 67aad00f9854..a8a8fbc36a3c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -75,15 +75,18 @@ private[spark] class KubernetesExecutorBuilder { val spec = KubernetesExecutorSpec( initialPod, + executorPreKubernetesResources = Seq.empty, executorKubernetesResources = Seq.empty) // If using a template this will always get the resources from that and combine // them with any Spark conf or ResourceProfile resources. features.foldLeft(spec) { case (spec, feature) => val configuredPod = feature.configurePod(spec.pod) + val addedPreResources = feature.getAdditionalPreKubernetesResources() val addedResources = feature.getAdditionalKubernetesResources() KubernetesExecutorSpec( configuredPod, + spec.executorPreKubernetesResources ++ addedPreResources, spec.executorKubernetesResources ++ addedResources) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala index 5eeab5501e47..6d27ef57b3d0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala @@ -127,10 +127,10 @@ class StatefulSetPodsAllocator( val meta = executorPod.pod.getMetadata() // Resources that need to be created, volumes are per-pod which is all we care about here. - val resources = resolvedExecutorSpec.executorKubernetesResources + val preResources = resolvedExecutorSpec.executorPreKubernetesResources // We'll let PVCs be handled by the statefulset. Note user is responsible for // cleaning up PVCs. Future work: integrate with KEP1847 once stabilized. - val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim") + val dynamicVolumeClaims = preResources.filter(_.getKind == "PersistentVolumeClaim") .map(_.asInstanceOf[PersistentVolumeClaim]) // Remove the dynamic volumes from our pod val dynamicVolumeClaimNames: Set[String] = dynamicVolumeClaims.map(_.getMetadata().getName()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala index 2259ba99e6a5..80530ad7b53f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala @@ -22,16 +22,26 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimBuilder, PodBuilder} +import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionBuilder +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.scalatest.PrivateMethodTester +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{never, times, verify, when} +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s.Fabric8Aliases.{PERSISTENT_VOLUME_CLAIMS, PVC_WITH_NAMESPACE, RESOURCE_LIST} -class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester { +class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester with BeforeAndAfter { private val HOST = "test-host" + private val NAMESPACE = "test-namespace" + private val POD_UID = "pod-id" + private val POD_API_VERSION = "v1" + private val POD_KIND = "pod" private val POD = new PodBuilder() .withNewSpec() .withHostname(HOST) @@ -40,6 +50,77 @@ class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester { new ContainerBuilder().withName("second").build()) .endSpec() .build() + private val EXECUTOR_POD = new PodBuilder(POD) + .withNewMetadata() + .withName("executor") + .withUid("executor-" + POD_UID) + .endMetadata() + .withApiVersion(POD_API_VERSION) + .withKind(POD_KIND) + .build() + private val PVC = new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName("test-pvc") + .endMetadata() + .build() + private val CRD = new CustomResourceDefinitionBuilder() + .withNewMetadata() + .withName("test-crd") + .endMetadata() + .build() + private val PVC_WITH_OWNER_REFERENCES = new PersistentVolumeClaimBuilder(PVC) + .editMetadata() + .addNewOwnerReference() + .withName("executor") + .withApiVersion(POD_API_VERSION) + .withUid("executor-" + POD_UID) + .withKind(POD_KIND) + .withController(true) + .endOwnerReference() + .endMetadata() + .build() + private val CRD_WITH_OWNER_REFERENCES = new CustomResourceDefinitionBuilder(CRD) + .editMetadata() + .addNewOwnerReference() + .withName("executor") + .withApiVersion(POD_API_VERSION) + .withUid("executor-" + POD_UID) + .withKind(POD_KIND) + .withController(true) + .endOwnerReference() + .endMetadata() + .build() + + private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*) + + @Mock + private var kubernetesClient: KubernetesClient = _ + + @Mock + private var persistentVolumeClaims: PERSISTENT_VOLUME_CLAIMS = _ + + @Mock + private var pvcWithNamespace: PVC_WITH_NAMESPACE = _ + + @Mock + private var pvcResource: io.fabric8.kubernetes.client.dsl.Resource[PersistentVolumeClaim] = _ + + @Mock + private var resourceList: RESOURCE_LIST = _ + + private var createdPvcArgumentCaptor: ArgumentCaptor[PersistentVolumeClaim] = _ + private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _ + + before { + MockitoAnnotations.openMocks(this).close() + when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) + when(persistentVolumeClaims.inNamespace(any())).thenReturn(pvcWithNamespace) + when(pvcWithNamespace.resource(any())).thenReturn(pvcResource) + when(pvcWithNamespace.withName(any())).thenReturn(pvcResource) + when(kubernetesClient.resourceList(any(classOf[HasMetadata]))).thenReturn(resourceList) + createdPvcArgumentCaptor = ArgumentCaptor.forClass(classOf[PersistentVolumeClaim]) + createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) + } test("Selects the given container as spark container.") { val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("second")) @@ -159,4 +240,63 @@ class KubernetesUtilsSuite extends SparkFunSuite with PrivateMethodTester { ("testKey3", null, null))) assert(outputEnvVars == expectedEnvVars) } + + test("SPARK-41781: verify pvc creation as expected") { + KubernetesUtils.createPreResource(kubernetesClient, PVC, NAMESPACE) + verify(pvcResource, times(1)).create() + verify(resourceList, never()).createOrReplace() + } + + test("SPARK-41781: verify resource creation as expected") { + KubernetesUtils.createPreResource(kubernetesClient, CRD, NAMESPACE) + verify(pvcResource, never()).create() + verify(resourceList, times(1)).createOrReplace() + } + + test("SPARK-41781: verify pvc creation exception as expected") { + when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create")) + intercept[KubernetesClientException] { + KubernetesUtils.createPreResource(kubernetesClient, PVC, NAMESPACE) + } + verify(resourceList, never()).createOrReplace() + } + + test("SPARK-41781: verify resource creation exception as expected") { + when(resourceList.createOrReplace()) + .thenThrow(new KubernetesClientException("Resource fails to create")) + intercept[KubernetesClientException] { + KubernetesUtils.createPreResource(kubernetesClient, CRD, NAMESPACE) + } + verify(pvcResource, never()).create() + } + + test("SPARK-41781: verify ownerReference in PVC refreshed as expected") { + when(pvcResource.get()).thenReturn(PVC) + when(pvcResource.patch(createdPvcArgumentCaptor.capture())).thenReturn(PVC) + + KubernetesUtils.refreshOwnerReferenceInResource( + kubernetesClient, + PVC, + NAMESPACE, + EXECUTOR_POD) + + val pvcWithOwnerReference = createdPvcArgumentCaptor.getValue + assert(pvcWithOwnerReference === PVC_WITH_OWNER_REFERENCES) + verify(resourceList, never()).createOrReplace() + } + + test("SPARK-41781: verify ownerReference in CRD refreshed as expected") { + doReturn(resourceList) + .when(kubernetesClient) + .resourceList(createdResourcesArgumentCaptor.capture()) + KubernetesUtils.refreshOwnerReferenceInResource( + kubernetesClient, + CRD, + NAMESPACE, + EXECUTOR_POD) + + val crdWithOwnerReference = createdResourcesArgumentCaptor.getValue + assert(crdWithOwnerReference === CRD_WITH_OWNER_REFERENCES) + verify(pvcResource, never()).patch() + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a066775f7dab..4a37f2e793f8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -48,6 +48,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private val driverPodName = "driver" + private val persistentVolumeClaimName = "pvc" + private val driverPod = new PodBuilder() .withNewMetadata() .withName(driverPodName) @@ -57,6 +59,12 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .endMetadata() .build() + private val pvc = new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName(persistentVolumeClaimName) + .endMetadata() + .build() + private val conf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s") @@ -111,6 +119,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + @Mock + private var deletableList: RESOURCE_LIST = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ @@ -143,6 +154,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims) when(pvcWithNamespace.resource(any())).thenReturn(pvcResource) + when(pvcWithNamespace.withName(any())).thenReturn(pvcResource) when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList) when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava) } @@ -736,7 +748,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) KubernetesExecutorSpec( executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), - Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi"))) + Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")), + Seq.empty) }) podsAllocatorUnderTest = new ExecutorPodsAllocator( @@ -832,8 +845,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) KubernetesExecutorSpec( executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), - Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi"))) + Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi")), + Seq.empty) }) + when(pvcResource.get).thenReturn(pvc) + when(pvcResource.patch(any(classOf[PersistentVolumeClaim]))).thenReturn(pvc) podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, @@ -901,7 +917,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) KubernetesExecutorSpec( executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), - Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi"))) + Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi")), + Seq.empty) }) podsAllocatorUnderTest = new ExecutorPodsAllocator( @@ -916,9 +933,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create")) + when(kubernetesClient.resourceList(any(classOf[HasMetadata]))).thenReturn(deletableList) intercept[KubernetesClientException] { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) } + verify(deletableList, times(1)).delete() assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) } @@ -927,6 +946,6 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt, - k8sConf.resourceProfileId.toInt), Seq.empty) + k8sConf.resourceProfileId.toInt), Seq.empty, Seq.empty) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala index f74d2c9feee0..76d8d10e9c96 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala @@ -103,7 +103,7 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) KubernetesExecutorSpec(executorPodWithId(0, - k8sConf.resourceProfileId.toInt), Seq.empty) + k8sConf.resourceProfileId.toInt), Seq.empty, Seq.empty) } before {