diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa380194ac..83d46ceb3ae97 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,13 +17,16 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable import scala.util.control.NonFatal +import util.control.Breaks._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") - watcher.awaitCompletion() - logInfo(s"Application $appName finished.") - } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") + val driverPodName = resolvedDriverPod.getMetadata.getName + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") + breakable { + while (true) { + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + + watcher.reset() + + watch = podWithName.watch(watcher) + + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + if(watcher.watchOrStop(sId)) { + logInfo(s"Stop watching as the pod has completed.") + watch.close() + break + } } } } @@ -230,7 +241,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None - val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, + loggingInterval, + waitForAppCompletion) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 4a7d3d42d23db..6c596ac34cc53 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.k8s.submit +import java.net.HttpURLConnection.HTTP_GONE import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ @@ -28,8 +29,10 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils + private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def awaitCompletion(): Unit + def reset(): Unit + def watchOrStop(sId: String): Boolean } /** @@ -42,9 +45,12 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, - maybeLoggingInterval: Option[Long]) + maybeLoggingInterval: Option[Long], + waitForCompletion: Boolean) extends LoggingPodStatusWatcher with Logging { + private var resourceTooOldReceived: Boolean = false + private var podCompleted = false private val podCompletedFuture = new CountDownLatch(1) // start timer for periodic logging private val scheduler = @@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { + resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { - logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + logInfo(s"Stopping watching application $appId with last-observed phase $phase") + if (e != null && e.getCode==HTTP_GONE) { + resourceTooOldReceived = true + logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") + } else { + logInfo(s"Got proper termination code, closing watcher.") + closeWatch() + } } private def logShortStatus() = { @@ -97,6 +113,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def closeWatch(): Unit = { podCompletedFuture.countDown() scheduler.shutdown() + podCompleted = true } private def formatPodState(pod: Pod): String = { @@ -134,13 +151,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.mkString("") } - override def awaitCompletion(): Unit = { - podCompletedFuture.await() - logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" - }.getOrElse("No containers were found in the driver pod.")) - } - private def containersDescription(p: Pod): String = { p.getStatus.getContainerStatuses.asScala.map { status => Seq( @@ -177,4 +187,35 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { + logInfo(s"Patched Sept 8th: Waiting for application" + + s" ${appId} with submission ID $sId to finish...") + val interval = maybeLoggingInterval + + synchronized { + while (!podCompleted && !resourceTooOldReceived) { + wait(interval.get) + logDebug(s"Application status for $appId (phase: $phase)") + } + } + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") + } else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") + } + + logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") + + podCompleted + } else { + logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") + logInfo(s"It seems we end up here, because we never want to wait for completion...") + // Always act like the application has completed since we don't want to wait for app completion + true + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e79189ff32..ad28b0789aa1e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -151,6 +151,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(loggingPodStatusWatcher.watchOrStop("default" + ":" + POD_NAME)).thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) @@ -205,6 +206,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { loggingPodStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(loggingPodStatusWatcher).awaitCompletion() + verify(loggingPodStatusWatcher).watchOrStop("default:driver") } }