From b6e6c2618d59adcaf4348b15e88b007e503e9ae2 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 12 Jun 2021 11:28:39 +0800 Subject: [PATCH 1/2] [MINOR] Print the driver pod name instead of Some(name) if absent --- .../cluster/k8s/ExecutorPodsAllocator.scala | 14 +++++--------- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 14 +++++++++++++- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index ad489edef62ea..62144188dadbe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -67,18 +67,14 @@ private[spark] class ExecutorPodsAllocator( private val namespace = conf.get(KUBERNETES_NAMESPACE) - private val kubernetesDriverPodName = conf - .get(KUBERNETES_DRIVER_POD_NAME) - private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) - val driverPod = kubernetesDriverPodName - .map(name => Option(kubernetesClient.pods() - .withName(name) - .get()) + val driverPod = conf.get(KUBERNETES_DRIVER_POD_NAME).map { name => + Option(kubernetesClient.pods().withName(name).get()) .getOrElse(throw new SparkException( - s"No pod was found named $kubernetesDriverPodName in the cluster in the " + - s"namespace $namespace (this was supposed to be the driver pod.)."))) + s"No pod was found named $name in the cluster in the " + + s"namespace $namespace (this was supposed to be the driver pod.).")) + } // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created. diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 207094ea75c9e..a54f1a105b9e7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -32,7 +32,7 @@ import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester._ -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -653,6 +653,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(persistentVolumeClaims, never()).create(any()) } + test("print the pod name instead of Some(name) if pod is absent") { + val nonexistentPod = "i-do-not-exist" + val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, nonexistentPod) + when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.withName(nonexistentPod)).thenReturn(driverPodOperations) + when(driverPodOperations.get()).thenReturn(null) + val e = intercept[SparkException](new ExecutorPodsAllocator( + conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)) + assert(e.getMessage.contains("No pod was found named i-do-not-exist in the cluster in the" + + " namespace default")) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) From 0f23311bbf5681b612f19c47fc488b53f0d04ace Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 13 Jun 2021 10:33:23 +0800 Subject: [PATCH 2/2] [MINOR] Print the driver pod name instead of Some(name) if absent --- .../cluster/k8s/ExecutorPodsAllocator.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 62144188dadbe..d6dc13e22ea4f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -67,14 +67,18 @@ private[spark] class ExecutorPodsAllocator( private val namespace = conf.get(KUBERNETES_NAMESPACE) + private val kubernetesDriverPodName = conf + .get(KUBERNETES_DRIVER_POD_NAME) + private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) - val driverPod = conf.get(KUBERNETES_DRIVER_POD_NAME).map { name => - Option(kubernetesClient.pods().withName(name).get()) + val driverPod = kubernetesDriverPodName + .map(name => Option(kubernetesClient.pods() + .withName(name) + .get()) .getOrElse(throw new SparkException( s"No pod was found named $name in the cluster in the " + - s"namespace $namespace (this was supposed to be the driver pod.).")) - } + s"namespace $namespace (this was supposed to be the driver pod.)."))) // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created.