From c915f0ed75ebd634ec4fe547700c93c5e7875384 Mon Sep 17 00:00:00 2001 From: Stijn De Haes Date: Thu, 30 Apr 2020 15:25:47 +0200 Subject: [PATCH 1/4] Restart the watcher when we receive a version changed from k8s --- .../submit/KubernetesClientApplication.scala | 48 +++++++++++-------- .../k8s/submit/LoggingPodStatusWatcher.scala | 7 ++- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index b4155fed8aa24..25b6b2c568217 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,13 +17,15 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} import scala.collection.mutable import scala.util.control.NonFatal +import util.control.Breaks._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication @@ -127,25 +129,33 @@ private[spark] class Client( .endSpec() .build() val driverPodName = resolvedDriverPod.getMetadata.getName - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(driverPodName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - val sId = Seq(conf.namespace, driverPodName).mkString(":") - watcher.watchOrStop(sId) + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + val sId = Seq(conf.namespace, driverPodName).mkString(":") + breakable { + while (true) { + try { + watch = kubernetesClient + .pods() + .withName(driverPodName) + .watch(watcher) + watcher.watchOrStop(sId) + break + } catch { + case e: KubernetesClientException if e.getCode == HTTP_GONE => + logInfo("Resource version changed rerunning the watcher") + } + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index ce3c80c0f85b1..92666e5b72844 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import java.net.HttpURLConnection.HTTP_GONE import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.KubernetesDriverConf @@ -62,7 +63,11 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + if (e.getCode == HTTP_GONE) { + logDebug("Got HTTP Gone code, resource version changed in k8s api") + } else { + closeWatch() + } } private def logLongStatus(): Unit = { From 14c4cfa0719912d35ab09bc52f650d2627a96960 Mon Sep 17 00:00:00 2001 From: Stijn De Haes Date: Mon, 4 May 2020 08:29:08 +0200 Subject: [PATCH 2/4] Fetch latest state before continuing --- .../deploy/k8s/submit/KubernetesClientApplication.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 25b6b2c568217..faaa26b4a3db9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -23,6 +23,7 @@ import java.util.Properties import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable import scala.util.control.NonFatal import util.control.Breaks._ @@ -145,10 +146,9 @@ private[spark] class Client( breakable { while (true) { try { - watch = kubernetesClient - .pods() - .withName(driverPodName) - .watch(watcher) + val podWithName = kubernetesClient.pods().withName(driverPodName) + watch = podWithName.watch(watcher) + watcher.eventReceived(Action.MODIFIED, podWithName.get()) watcher.watchOrStop(sId) break } catch { From 6b3970f9c0e719630ba1eabe0074723fc7125297 Mon Sep 17 00:00:00 2001 From: Stijn De Haes Date: Tue, 5 May 2020 18:47:40 +0200 Subject: [PATCH 3/4] Rewriten the fix Since the HTTP_GONE error does not throw an error. We have to check for state in the watcher if this has happened and restart it after. Also closed the watch if the watcher successfully ended. --- .../submit/KubernetesClientApplication.scala | 26 ++++++++------ .../k8s/submit/LoggingPodStatusWatcher.scala | 35 +++++++++++++------ .../spark/deploy/k8s/submit/ClientSuite.scala | 1 + 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index faaa26b4a3db9..713d35dcf64f5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,12 +17,11 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter -import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} +import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable import scala.util.control.NonFatal @@ -145,15 +144,20 @@ private[spark] class Client( val sId = Seq(conf.namespace, driverPodName).mkString(":") breakable { while (true) { - try { - val podWithName = kubernetesClient.pods().withName(driverPodName) - watch = podWithName.watch(watcher) - watcher.eventReceived(Action.MODIFIED, podWithName.get()) - watcher.watchOrStop(sId) - break - } catch { - case e: KubernetesClientException if e.getCode == HTTP_GONE => - logInfo("Resource version changed rerunning the watcher") + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + // Reset resource to old before we start the watch, this is important for race conditions + watcher.reset() + watch = podWithName.watch(watcher) + + // Send the latest pod state we know to the watcher to make sure we didn't miss anything + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + // Break the while loop if the pod is completed or we don't want to wait + if(watcher.watchOrStop(sId)) { + watch.close() + break } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 92666e5b72844..5d0f895135e83 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -27,7 +27,8 @@ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def watchOrStop(submissionId: String): Unit + def watchOrStop(submissionId: String): Boolean + def reset(): Unit } /** @@ -43,17 +44,24 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) private var podCompleted = false + private var resourceToOldReceived = false + private var pod = Option.empty[Pod] private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + override def reset(): Unit = { + resourceToOldReceived = false + } + override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) action match { case Action.DELETED | Action.ERROR => closeWatch() - case _ => + case a => + logTrace(s"Received action: $a") logLongStatus() if (hasCompleted()) { closeWatch() @@ -63,8 +71,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - if (e.getCode == HTTP_GONE) { - logDebug("Got HTTP Gone code, resource version changed in k8s api") + if(e != null && e.getCode == HTTP_GONE) { + resourceToOldReceived = true + logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") } else { closeWatch() } @@ -83,20 +92,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) this.notifyAll() } - override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) { + override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) { logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") val interval = conf.get(REPORT_INTERVAL) synchronized { - while (!podCompleted) { + while (!podCompleted && !resourceToOldReceived) { wait(interval) logInfo(s"Application status for $appId (phase: $phase)") } } - logInfo( - pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } - .getOrElse("No containers were found in the driver pod.")) - logInfo(s"Application ${conf.appName} with submission ID $sId finished") + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${conf.appName} with submission ID $sId finished") + } + podCompleted } else { logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") + // Always act like the application has completed since we don't want to wait for app completion + true } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 5d49ac0bbaafa..d9ec3feb526ee 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) From 51dc743afd3d311b4833079b0c3851ba62c237af Mon Sep 17 00:00:00 2001 From: Stijn De Haes Date: Fri, 19 Jun 2020 12:03:43 +0200 Subject: [PATCH 4/4] Typo fix and revert logging changes --- .../deploy/k8s/submit/LoggingPodStatusWatcher.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 5d0f895135e83..aa27a9ef508ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -44,14 +44,14 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) private var podCompleted = false - private var resourceToOldReceived = false + private var resourceTooOldReceived = false private var pod = Option.empty[Pod] private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") override def reset(): Unit = { - resourceToOldReceived = false + resourceTooOldReceived = false } override def eventReceived(action: Action, pod: Pod): Unit = { @@ -60,8 +60,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) case Action.DELETED | Action.ERROR => closeWatch() - case a => - logTrace(s"Received action: $a") + case _ => logLongStatus() if (hasCompleted()) { closeWatch() @@ -72,7 +71,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") if(e != null && e.getCode == HTTP_GONE) { - resourceToOldReceived = true + resourceTooOldReceived = true logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") } else { closeWatch() @@ -96,7 +95,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") val interval = conf.get(REPORT_INTERVAL) synchronized { - while (!podCompleted && !resourceToOldReceived) { + while (!podCompleted && !resourceTooOldReceived) { wait(interval) logInfo(s"Application status for $appId (phase: $phase)") }