-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s #30283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,15 @@ | |
| package org.apache.spark.deploy.k8s.submit | ||
|
|
||
| import java.io.StringWriter | ||
| import java.net.HttpURLConnection.HTTP_GONE | ||
| import java.util.{Collections, UUID} | ||
| import java.util.Properties | ||
|
|
||
| import io.fabric8.kubernetes.api.model._ | ||
| import io.fabric8.kubernetes.client.KubernetesClient | ||
| import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} | ||
| import io.fabric8.kubernetes.client.Watcher.Action | ||
| import scala.collection.mutable | ||
| import scala.util.control.Breaks._ | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.SparkConf | ||
|
|
@@ -133,29 +136,37 @@ private[spark] class Client( | |
| .endVolume() | ||
| .endSpec() | ||
| .build() | ||
| Utils.tryWithResource( | ||
| kubernetesClient | ||
| .pods() | ||
| .withName(resolvedDriverPod.getMetadata.getName) | ||
| .watch(watcher)) { _ => | ||
| val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) | ||
| try { | ||
| val otherKubernetesResources = | ||
| resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) | ||
| addDriverOwnerReference(createdDriverPod, otherKubernetesResources) | ||
| kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() | ||
| } catch { | ||
| case NonFatal(e) => | ||
| kubernetesClient.pods().delete(createdDriverPod) | ||
| throw e | ||
| } | ||
|
|
||
| if (waitForAppCompletion) { | ||
| logInfo(s"Waiting for application $appName to finish...") | ||
| watcher.awaitCompletion() | ||
| logInfo(s"Application $appName finished.") | ||
| } else { | ||
| logInfo(s"Deployed Spark application $appName into Kubernetes.") | ||
| val driverPodName = resolvedDriverPod.getMetadata.getName | ||
| var watch: Watch = null | ||
| val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) | ||
| try { | ||
| val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) | ||
| addDriverOwnerReference(createdDriverPod, otherKubernetesResources) | ||
| kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() | ||
| } catch { | ||
| case NonFatal(e) => | ||
| kubernetesClient.pods().delete(createdDriverPod) | ||
| throw e | ||
| } | ||
| val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") | ||
| breakable { | ||
| while (true) { | ||
| val podWithName = kubernetesClient | ||
| .pods() | ||
| .withName(driverPodName) | ||
|
|
||
| watcher.reset() | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove this empty line like the other branch. |
||
| watch = podWithName.watch(watcher) | ||
|
|
||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| watcher.eventReceived(Action.MODIFIED, podWithName.get()) | ||
|
|
||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if(watcher.watchOrStop(sId)) { | ||
| logInfo(s"Stop watching as the pod has completed.") | ||
|
||
| watch.close() | ||
| break | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -230,7 +241,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { | |
| val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) | ||
| val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None | ||
|
|
||
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) | ||
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, | ||
| loggingInterval, | ||
| waitForAppCompletion) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( | ||
| master, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s.submit | ||
|
|
||
| import java.net.HttpURLConnection.HTTP_GONE | ||
| import java.util.concurrent.{CountDownLatch, TimeUnit} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
@@ -28,8 +29,10 @@ import org.apache.spark.SparkException | |
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
|
|
||
|
||
| private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | ||
| def awaitCompletion(): Unit | ||
| def watchOrStop(submissionId: String): Boolean | ||
| def reset(): Unit | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -42,13 +45,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | |
| */ | ||
| private[k8s] class LoggingPodStatusWatcherImpl( | ||
| appId: String, | ||
| maybeLoggingInterval: Option[Long]) | ||
| maybeLoggingInterval: Option[Long], | ||
| waitForCompletion: Boolean) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| extends LoggingPodStatusWatcher with Logging { | ||
|
|
||
| private var podCompleted = false | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private var resourceTooOldReceived: Boolean = false | ||
|
|
||
| private val podCompletedFuture = new CountDownLatch(1) | ||
|
|
||
| // start timer for periodic logging | ||
| private val scheduler = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") | ||
|
|
||
| private val logRunnable: Runnable = new Runnable { | ||
| override def run() = logShortStatus() | ||
| } | ||
|
|
@@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| } | ||
| } | ||
|
|
||
| override def reset(): Unit = { | ||
|
||
| resourceTooOldReceived = false | ||
| } | ||
|
|
||
| override def onClose(e: KubernetesClientException): Unit = { | ||
| logDebug(s"Stopping watching application $appId with last-observed phase $phase") | ||
| closeWatch() | ||
| if (e != null && e.getCode==HTTP_GONE) { | ||
|
||
| resourceTooOldReceived = true | ||
| logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") | ||
| } else { | ||
| closeWatch() | ||
| } | ||
| } | ||
|
|
||
| private def logShortStatus() = { | ||
|
|
@@ -97,6 +116,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| private def closeWatch(): Unit = { | ||
| podCompletedFuture.countDown() | ||
| scheduler.shutdown() | ||
| podCompleted = true | ||
| } | ||
|
|
||
| private def formatPodState(pod: Pod): String = { | ||
|
|
@@ -134,13 +154,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| }.mkString("") | ||
| } | ||
|
|
||
| override def awaitCompletion(): Unit = { | ||
| podCompletedFuture.await() | ||
| logInfo(pod.map { p => | ||
| s"Container final statuses:\n\n${containersDescription(p)}" | ||
| }.getOrElse("No containers were found in the driver pod.")) | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This removal looks like a part of independency PR instead of the part of SPARK-24266. Could you tell us why this is required and where this came from?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shockdm Could you chime in on this one?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dongjoon-hyun It does look like it originates from SPARK-28947 02c5b4f which eliminated the future and was a rename. Since this is a private trait, the logic should be completely self-contained and safe to remove. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jkleckner thank you for following up, that is correct. Sorry for the late response :( |
||
| private def containersDescription(p: Pod): String = { | ||
| p.getStatus.getContainerStatuses.asScala.map { status => | ||
| Seq( | ||
|
|
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( | |
| private def formatTime(time: String): String = { | ||
| if (time != null || time != "") time else "N/A" | ||
| } | ||
|
|
||
| override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { | ||
| logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") | ||
|
||
| val interval = maybeLoggingInterval | ||
|
|
||
| synchronized { | ||
| while (!podCompleted && !resourceTooOldReceived) { | ||
| wait(interval.get) | ||
| logInfo(s"Application status for $appId (phase: $phase)") | ||
| } | ||
| } | ||
|
|
||
| if(podCompleted) { | ||
| logInfo( | ||
| pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } | ||
| .getOrElse("No containers were found in the driver pod.")) | ||
| logInfo(s"Application ${appId} with submission ID $sId finished") | ||
| } else { | ||
| logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") | ||
|
||
| } | ||
|
|
||
| logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") | ||
|
||
|
|
||
| podCompleted | ||
| } else { | ||
| logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logInfo(s"It seems we end up here, because we never want to wait for completion...") | ||
|
||
| // Always act like the application has completed since we don't want to wait for app completion | ||
| true | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -151,6 +151,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { | |
| createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) | ||
| when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) | ||
| when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) | ||
| when(loggingPodStatusWatcher.watchOrStop(kubernetesConf.namespace() + ":" + POD_NAME)) | ||
| .thenReturn(true) | ||
| doReturn(resourceList) | ||
| .when(kubernetesClient) | ||
| .resourceList(createdResourcesArgumentCaptor.capture()) | ||
|
|
@@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { | |
| loggingPodStatusWatcher, | ||
| KUBERNETES_RESOURCE_PREFIX) | ||
| submissionClient.run() | ||
| verify(loggingPodStatusWatcher).awaitCompletion() | ||
| verify(loggingPodStatusWatcher).watchOrStop("default:driver") | ||
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the following comments like the other branches?