From 86f8ee882aefe1216fa1617572915d4c824dda67 Mon Sep 17 00:00:00 2001 From: Dmitriy Drinfeld Date: Wed, 2 Sep 2020 11:30:51 -0400 Subject: [PATCH 1/3] [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s This is a backport of #29533 from master. It includes the shockdm/spark/pull/1 which has been squashed and the import review comment include. It has also been rebased to branch-2.4 Address review comments. --- .../submit/KubernetesClientApplication.scala | 61 +++++++++++------- .../k8s/submit/LoggingPodStatusWatcher.scala | 63 ++++++++++++++++--- .../spark/deploy/k8s/submit/ClientSuite.scala | 4 +- 3 files changed, 93 insertions(+), 35 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa380194a..ef965953ced2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,12 +17,15 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable +import scala.util.control.Breaks._ import scala.util.control.NonFatal import org.apache.spark.SparkConf @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") - watcher.awaitCompletion() - logInfo(s"Application $appName finished.") - } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") + val driverPodName = resolvedDriverPod.getMetadata.getName + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") + breakable { + while (true) { + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + + watcher.reset() + + watch = podWithName.watch(watcher) + + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + if(watcher.watchOrStop(sId)) { + logInfo(s"Stop watching as the pod has completed.") + watch.close() + break + } } } } @@ -230,7 +241,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None - val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, + loggingInterval, + waitForAppCompletion) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 4a7d3d42d23d..0ff9adcf056c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.k8s.submit +import java.net.HttpURLConnection.HTTP_GONE import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ @@ -28,8 +29,10 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils + private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def awaitCompletion(): Unit + def watchOrStop(submissionId: String): Boolean + def reset(): Unit } /** @@ -42,13 +45,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, - maybeLoggingInterval: Option[Long]) + maybeLoggingInterval: Option[Long], + waitForCompletion: Boolean) extends LoggingPodStatusWatcher with Logging { + private var podCompleted = false + + private var resourceTooOldReceived: Boolean = false + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } @@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { + resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + if (e != null && e.getCode==HTTP_GONE) { + resourceTooOldReceived = true + logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") + } else { + closeWatch() + } } private def logShortStatus() = { @@ -97,6 +116,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def closeWatch(): Unit = { podCompletedFuture.countDown() scheduler.shutdown() + podCompleted = true } private def formatPodState(pod: Pod): String = { @@ -134,13 +154,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.mkString("") } - override def awaitCompletion(): Unit = { - podCompletedFuture.await() - logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" - }.getOrElse("No containers were found in the driver pod.")) - } - private def containersDescription(p: Pod): String = { p.getStatus.getContainerStatuses.asScala.map { status => Seq( @@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { + logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") + val interval = maybeLoggingInterval + + synchronized { + while (!podCompleted && !resourceTooOldReceived) { + wait(interval.get) + logInfo(s"Application status for $appId (phase: $phase)") + } + } + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") + } else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") + } + + logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") + + podCompleted + } else { + logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") + logInfo(s"It seems we end up here, because we never want to wait for completion...") + // Always act like the application has completed since we don't want to wait for app completion + true + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e79189ff3..4fdcba49a4e7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -151,6 +151,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(loggingPodStatusWatcher.watchOrStop(kubernetesConf.namespace() + ":" + POD_NAME)) + .thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) @@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { loggingPodStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(loggingPodStatusWatcher).awaitCompletion() + verify(loggingPodStatusWatcher).watchOrStop("default:driver") } } From f01b3d3197da329a270cef772be31a7931dab26c Mon Sep 17 00:00:00 2001 From: Jim Kleckner Date: Thu, 19 Nov 2020 15:55:09 -0800 Subject: [PATCH 2/3] Address review comments --- .../submit/KubernetesClientApplication.scala | 5 +++-- .../k8s/submit/LoggingPodStatusWatcher.scala | 17 +++++------------ .../spark/deploy/k8s/submit/ClientSuite.scala | 2 +- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index ef965953ced2..cbda8a76a772 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -155,15 +155,16 @@ private[spark] class Client( val podWithName = kubernetesClient .pods() .withName(driverPodName) - + // Reset resource to old before we start the watch, this is important for race conditions watcher.reset() watch = podWithName.watch(watcher) + // Send the latest pod state we know to the watcher to make sure we didn't miss anything watcher.eventReceived(Action.MODIFIED, podWithName.get()) + // Break the while loop if the pod is completed or we don't want to wait if(watcher.watchOrStop(sId)) { - logInfo(s"Stop watching as the pod has completed.") watch.close() break } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 0ff9adcf056c..f37aa71cc4a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -29,7 +29,6 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils - private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { def watchOrStop(submissionId: String): Boolean def reset(): Unit @@ -67,6 +66,10 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + override def reset(): Unit = { + resourceTooOldReceived = false + } + def start(): Unit = { maybeLoggingInterval.foreach { interval => scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) @@ -87,10 +90,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } - override def reset(): Unit = { - resourceTooOldReceived = false - } - override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") if (e != null && e.getCode==HTTP_GONE) { @@ -192,7 +191,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( } override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { - logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") + logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") val interval = maybeLoggingInterval synchronized { @@ -207,16 +206,10 @@ private[k8s] class LoggingPodStatusWatcherImpl( pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } .getOrElse("No containers were found in the driver pod.")) logInfo(s"Application ${appId} with submission ID $sId finished") - } else { - logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") } - - logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") - podCompleted } else { logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") - logInfo(s"It seems we end up here, because we never want to wait for completion...") // Always act like the application has completed since we don't want to wait for app completion true } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4fdcba49a4e7..d997d42e966a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -207,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { loggingPodStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(loggingPodStatusWatcher).watchOrStop("default:driver") + verify(loggingPodStatusWatcher).watchOrStop(kubernetesConf.namespace + ":driver") } } From 1c64c6cbdfd392be490f031c4599d786d831b9c2 Mon Sep 17 00:00:00 2001 From: Jim Kleckner Date: Thu, 19 Nov 2020 17:46:50 -0800 Subject: [PATCH 3/3] One more review change --- .../spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index f37aa71cc4a0..8f4594190d89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -92,7 +92,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - if (e != null && e.getCode==HTTP_GONE) { + if (e != null && e.getCode == HTTP_GONE) { resourceTooOldReceived = true logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") } else {