-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s #30283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,15 @@ | |
| package org.apache.spark.deploy.k8s.submit | ||
|
|
||
| import java.io.StringWriter | ||
| import java.net.HttpURLConnection.HTTP_GONE | ||
| import java.util.{Collections, UUID} | ||
| import java.util.Properties | ||
|
|
||
| import io.fabric8.kubernetes.api.model._ | ||
| import io.fabric8.kubernetes.client.KubernetesClient | ||
| import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} | ||
| import io.fabric8.kubernetes.client.Watcher.Action | ||
| import scala.collection.mutable | ||
| import scala.util.control.Breaks._ | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.SparkConf | ||
|
|
@@ -133,29 +136,38 @@ private[spark] class Client( | |
| .endVolume() | ||
| .endSpec() | ||
| .build() | ||
| Utils.tryWithResource( | ||
| kubernetesClient | ||
| .pods() | ||
| .withName(resolvedDriverPod.getMetadata.getName) | ||
| .watch(watcher)) { _ => | ||
| val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) | ||
| try { | ||
| val otherKubernetesResources = | ||
| resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) | ||
| addDriverOwnerReference(createdDriverPod, otherKubernetesResources) | ||
| kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() | ||
| } catch { | ||
| case NonFatal(e) => | ||
| kubernetesClient.pods().delete(createdDriverPod) | ||
| throw e | ||
| } | ||
|
|
||
| if (waitForAppCompletion) { | ||
| logInfo(s"Waiting for application $appName to finish...") | ||
| watcher.awaitCompletion() | ||
| logInfo(s"Application $appName finished.") | ||
| } else { | ||
| logInfo(s"Deployed Spark application $appName into Kubernetes.") | ||
| val driverPodName = resolvedDriverPod.getMetadata.getName | ||
| var watch: Watch = null | ||
| val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) | ||
| try { | ||
| val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) | ||
| addDriverOwnerReference(createdDriverPod, otherKubernetesResources) | ||
| kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() | ||
| } catch { | ||
| case NonFatal(e) => | ||
| kubernetesClient.pods().delete(createdDriverPod) | ||
| throw e | ||
| } | ||
| val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") | ||
| breakable { | ||
| while (true) { | ||
| val podWithName = kubernetesClient | ||
| .pods() | ||
| .withName(driverPodName) | ||
| // Reset resource to old before we start the watch, this is important for race conditions | ||
| watcher.reset() | ||
|
|
||
| watch = podWithName.watch(watcher) | ||
|
|
||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Send the latest pod state we know to the watcher to make sure we didn't miss anything | ||
| watcher.eventReceived(Action.MODIFIED, podWithName.get()) | ||
|
|
||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Break the while loop if the pod is completed or we don't want to wait | ||
| if(watcher.watchOrStop(sId)) { | ||
| watch.close() | ||
| break | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { | |
| val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) | ||
| val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None | ||
|
|
||
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) | ||
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, | ||
| loggingInterval, | ||
| waitForAppCompletion) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( | ||
| master, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s.submit | ||
|
|
||
| import java.net.HttpURLConnection.HTTP_GONE | ||
| import java.util.concurrent.{CountDownLatch, TimeUnit} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
@@ -29,7 +30,8 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | ||
| def awaitCompletion(): Unit | ||
| def watchOrStop(submissionId: String): Boolean | ||
| def reset(): Unit | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | |
| */ | ||
| private[k8s] class LoggingPodStatusWatcherImpl( | ||
| appId: String, | ||
| maybeLoggingInterval: Option[Long]) | ||
| maybeLoggingInterval: Option[Long], | ||
| waitForCompletion: Boolean) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| extends LoggingPodStatusWatcher with Logging { | ||
|
|
||
| private var podCompleted = false | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private var resourceTooOldReceived: Boolean = false | ||
|
|
||
| private val podCompletedFuture = new CountDownLatch(1) | ||
|
|
||
| // start timer for periodic logging | ||
| private val scheduler = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") | ||
|
|
||
| private val logRunnable: Runnable = new Runnable { | ||
| override def run() = logShortStatus() | ||
| } | ||
|
|
@@ -57,6 +66,10 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
|
|
||
| private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") | ||
|
|
||
| override def reset(): Unit = { | ||
| resourceTooOldReceived = false | ||
| } | ||
|
|
||
| def start(): Unit = { | ||
| maybeLoggingInterval.foreach { interval => | ||
| scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) | ||
|
|
@@ -79,7 +92,12 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
|
|
||
| override def onClose(e: KubernetesClientException): Unit = { | ||
| logDebug(s"Stopping watching application $appId with last-observed phase $phase") | ||
| closeWatch() | ||
| if (e != null && e.getCode == HTTP_GONE) { | ||
| resourceTooOldReceived = true | ||
| logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") | ||
| } else { | ||
| closeWatch() | ||
| } | ||
| } | ||
|
|
||
| private def logShortStatus() = { | ||
|
|
@@ -97,6 +115,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| private def closeWatch(): Unit = { | ||
| podCompletedFuture.countDown() | ||
| scheduler.shutdown() | ||
| podCompleted = true | ||
| } | ||
|
|
||
| private def formatPodState(pod: Pod): String = { | ||
|
|
@@ -134,13 +153,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| }.mkString("") | ||
| } | ||
|
|
||
| override def awaitCompletion(): Unit = { | ||
| podCompletedFuture.await() | ||
| logInfo(pod.map { p => | ||
| s"Container final statuses:\n\n${containersDescription(p)}" | ||
| }.getOrElse("No containers were found in the driver pod.")) | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This removal looks like a part of independency PR instead of the part of SPARK-24266. Could you tell us why this is required and where this came from?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shockdm Could you chime in on this one?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dongjoon-hyun It does look like it originates from SPARK-28947 02c5b4f which eliminated the future and was a rename. Since this is a private trait, the logic should be completely self-contained and safe to remove. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jkleckner thank you for following up, that is correct. Sorry for the late response :( |
||
| private def containersDescription(p: Pod): String = { | ||
| p.getStatus.getContainerStatuses.asScala.map { status => | ||
| Seq( | ||
|
|
@@ -177,4 +189,28 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| private def formatTime(time: String): String = { | ||
| if (time != null || time != "") time else "N/A" | ||
| } | ||
|
|
||
| override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { | ||
| logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") | ||
| val interval = maybeLoggingInterval | ||
|
|
||
| synchronized { | ||
| while (!podCompleted && !resourceTooOldReceived) { | ||
| wait(interval.get) | ||
| logInfo(s"Application status for $appId (phase: $phase)") | ||
| } | ||
| } | ||
|
|
||
| if(podCompleted) { | ||
| logInfo( | ||
| pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } | ||
| .getOrElse("No containers were found in the driver pod.")) | ||
| logInfo(s"Application ${appId} with submission ID $sId finished") | ||
| } | ||
| podCompleted | ||
| } else { | ||
| logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Always act like the application has completed since we don't want to wait for app completion | ||
| true | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this empty line like the other branch.