From c689fb627905080595e482b5910b42007dc98c5d Mon Sep 17 00:00:00 2001 From: Dmitriy Drinfeld Date: Wed, 2 Sep 2020 11:30:51 -0400 Subject: [PATCH 1/3] debug fix, with debug statements --- .../submit/KubernetesClientApplication.scala | 60 ++++++++++++------- .../k8s/submit/LoggingPodStatusWatcher.scala | 39 +++++++++++- .../spark/deploy/k8s/submit/ClientSuite.scala | 2 +- 3 files changed, 75 insertions(+), 26 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa380194ac..bc12f5345d39e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,13 +17,15 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} import scala.collection.mutable import scala.util.control.NonFatal +import util.control.Breaks._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication @@ -133,29 +135,41 @@ private[spark] class Client( .endVolume() .endSpec() .build() - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") - watcher.awaitCompletion() - logInfo(s"Application $appName finished.") - } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") + val driverPodName = resolvedDriverPod.getMetadata.getName + logInfo("-------------------------- DEBUG --------------------------------") + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + logInfo("------------------------ DEBUG created pod -----------------------") + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + + logInfo("---------------- DEBUG created other resources -----------------") + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + logInfo("---------------- DEBUG failed here -----------------") + throw e + } + val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") + logInfo("---------------- DEBUG sId" + sId + " -----------------") + breakable { + while (true) { + try { + logInfo("---------------- DEBUG pre-watch here -----------------") + watch = kubernetesClient + .pods() + .withName(driverPodName) + .watch(watcher) + logInfo("---------------- DEBUG pre-watch-stop here -----------------") + watcher.watchOrStop(sId) + break + } catch { + case e: KubernetesClientException if e.getCode == HTTP_GONE => + logInfo("Resource version changed rerunning the watcher") + } } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 4a7d3d42d23db..355b47484e290 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.k8s.submit +import java.net.HttpURLConnection.HTTP_GONE import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ @@ -28,8 +29,10 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils + private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { def awaitCompletion(): Unit + def watchOrStop(sId: String): Boolean } /** @@ -45,6 +48,8 @@ private[k8s] class LoggingPodStatusWatcherImpl( maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging { + private var resourceTooOldReceived: Boolean = false + private var podCompleted = false private val podCompletedFuture = new CountDownLatch(1) // start timer for periodic logging private val scheduler = @@ -78,8 +83,13 @@ private[k8s] class LoggingPodStatusWatcherImpl( } override def onClose(e: KubernetesClientException): Unit = { - logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + logInfo(s"Stopping watching application $appId with last-observed phase $phase") + if (e != null && e.getCode==HTTP_GONE) { + resourceTooOldReceived = true + logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") + } else { + closeWatch() + } } private def logShortStatus() = { @@ -97,6 +107,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def closeWatch(): Unit = { podCompletedFuture.countDown() scheduler.shutdown() + podCompleted = true } private def formatPodState(pod: Pod): String = { @@ -177,4 +188,28 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (!hasCompleted()) { + logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") + val interval = maybeLoggingInterval + synchronized { + while (!podCompleted && !resourceTooOldReceived) { + wait(interval.get) + logInfo(s"Application status for $appId (phase: $phase)") + } + } + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") + } + podCompleted + } else { + logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") + logInfo(s"It seems we end up here, because we never want to wait for completion...") + // Always act like the application has completed since we don't want to wait for app completion + true + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e79189ff32..b1132ed0467f9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -205,6 +205,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { loggingPodStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(loggingPodStatusWatcher).awaitCompletion() + verify(loggingPodStatusWatcher).watchOrStop("default:driver") } } From 7ec38e8c79e5e5d78f9b233fa13611531d8babbc Mon Sep 17 00:00:00 2001 From: Dmitriy Drinfeld Date: Wed, 2 Sep 2020 12:44:25 -0400 Subject: [PATCH 2/3] fixes waitForCompletion check --- .../k8s/submit/KubernetesClientApplication.scala | 12 +++--------- .../deploy/k8s/submit/LoggingPodStatusWatcher.scala | 5 +++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index bc12f5345d39e..e634cbe03dbe2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -137,33 +137,25 @@ private[spark] class Client( .build() val driverPodName = resolvedDriverPod.getMetadata.getName - logInfo("-------------------------- DEBUG --------------------------------") var watch: Watch = null val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - logInfo("------------------------ DEBUG created pod -----------------------") try { val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) addDriverOwnerReference(createdDriverPod, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - - logInfo("---------------- DEBUG created other resources -----------------") } catch { case NonFatal(e) => kubernetesClient.pods().delete(createdDriverPod) - logInfo("---------------- DEBUG failed here -----------------") throw e } val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") - logInfo("---------------- DEBUG sId" + sId + " -----------------") breakable { while (true) { try { - logInfo("---------------- DEBUG pre-watch here -----------------") watch = kubernetesClient .pods() .withName(driverPodName) .watch(watcher) - logInfo("---------------- DEBUG pre-watch-stop here -----------------") watcher.watchOrStop(sId) break } catch { @@ -244,7 +236,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None - val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, + loggingInterval, + waitForAppCompletion) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 355b47484e290..3834e6d385de2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -45,7 +45,8 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, - maybeLoggingInterval: Option[Long]) + maybeLoggingInterval: Option[Long], + waitForCompletion: Boolean) extends LoggingPodStatusWatcher with Logging { private var resourceTooOldReceived: Boolean = false @@ -189,7 +190,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( if (time != null || time != "") time else "N/A" } - override def watchOrStop(sId: String): Boolean = if (!hasCompleted()) { + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") val interval = maybeLoggingInterval synchronized { From 0f6deb53f66c7ea1a64d95fd5888c5fcf4d01dae Mon Sep 17 00:00:00 2001 From: Dmitriy Drinfeld Date: Fri, 11 Sep 2020 17:46:46 -0400 Subject: [PATCH 3/3] adds rest of implementation, reduce logging --- .../submit/KubernetesClientApplication.scala | 25 +++++++++++-------- .../k8s/submit/LoggingPodStatusWatcher.scala | 25 +++++++++++-------- .../spark/deploy/k8s/submit/ClientSuite.scala | 1 + 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index e634cbe03dbe2..83d46ceb3ae97 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -23,6 +23,7 @@ import java.util.Properties import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable import scala.util.control.NonFatal import util.control.Breaks._ @@ -151,16 +152,20 @@ private[spark] class Client( val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") breakable { while (true) { - try { - watch = kubernetesClient - .pods() - .withName(driverPodName) - .watch(watcher) - watcher.watchOrStop(sId) - break - } catch { - case e: KubernetesClientException if e.getCode == HTTP_GONE => - logInfo("Resource version changed rerunning the watcher") + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + + watcher.reset() + + watch = podWithName.watch(watcher) + + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + if(watcher.watchOrStop(sId)) { + logInfo(s"Stop watching as the pod has completed.") + watch.close() + break } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 3834e6d385de2..6c596ac34cc53 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def awaitCompletion(): Unit + def reset(): Unit def watchOrStop(sId: String): Boolean } @@ -83,12 +83,17 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { + resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { logInfo(s"Stopping watching application $appId with last-observed phase $phase") if (e != null && e.getCode==HTTP_GONE) { resourceTooOldReceived = true logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") } else { + logInfo(s"Got proper termination code, closing watcher.") closeWatch() } } @@ -146,13 +151,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.mkString("") } - override def awaitCompletion(): Unit = { - podCompletedFuture.await() - logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" - }.getOrElse("No containers were found in the driver pod.")) - } - private def containersDescription(p: Pod): String = { p.getStatus.getContainerStatuses.asScala.map { status => Seq( @@ -191,12 +189,14 @@ private[k8s] class LoggingPodStatusWatcherImpl( } override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { - logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") + logInfo(s"Patched Sept 8th: Waiting for application" + + s" ${appId} with submission ID $sId to finish...") val interval = maybeLoggingInterval + synchronized { while (!podCompleted && !resourceTooOldReceived) { wait(interval.get) - logInfo(s"Application status for $appId (phase: $phase)") + logDebug(s"Application status for $appId (phase: $phase)") } } @@ -205,7 +205,12 @@ private[k8s] class LoggingPodStatusWatcherImpl( pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } .getOrElse("No containers were found in the driver pod.")) logInfo(s"Application ${appId} with submission ID $sId finished") + } else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") } + + logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") + podCompleted } else { logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index b1132ed0467f9..ad28b0789aa1e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -151,6 +151,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(loggingPodStatusWatcher.watchOrStop("default" + ":" + POD_NAME)).thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture())