diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 319a719efaa7..3c5ef0022c43 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -250,4 +250,9 @@ a.expandbutton { .table-cell-width-limited td { max-width: 600px; -} \ No newline at end of file +} + +.stop-delayed { + color: red; + margin-bottom: 1em; +} diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 7d31ac54a717..04fc75a89053 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -18,14 +18,14 @@ package org.apache.spark.ui import java.util.{Date, ServiceLoader} +import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, - UIRoot} +import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, UIRoot} import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} @@ -33,7 +33,7 @@ import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener import org.apache.spark.ui.storage.{StorageListener, StorageTab} -import org.apache.spark.util.Utils +import org.apache.spark.util.{SignalUtils, Utils} /** * Top level user interface for a Spark application. @@ -58,6 +58,9 @@ private[spark] class SparkUI private ( val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) + private val stopDelayThread = new AtomicReference[Thread]() + private val stopDelaySeconds = conf.getOption("spark.ui.stopDelay").map(Utils.timeStringAsSeconds) + var appId: String = _ private var streamingJobProgressListener: Option[SparkListener] = None @@ -80,6 +83,9 @@ private[spark] class SparkUI private ( attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) + attachHandler(createRedirectHandler( + "/proceed-with-ui-stop", "/jobs/", _ => proceedWithStop("web UI action"), + httpMethods = Set("GET", "POST"))) } initialize() @@ -93,10 +99,51 @@ private[spark] class SparkUI private ( appId = id } + def activeStopDelay: Option[Long] = + stopDelaySeconds.filter(_ => stopDelayThread.get() != null) + /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { - super.stop() - logInfo(s"Stopped Spark web UI at $webUrl") + + def reallyStop(): Unit = { + super.stop() + logInfo(s"Stopped Spark web UI at $webUrl") + } + + stopDelaySeconds match { + case Some(stopDelaySeconds) => + if (stopDelayThread.compareAndSet(null, Thread.currentThread())) { + delayStop(stopDelaySeconds) + reallyStop() + } + case None => + reallyStop() + } + } + + private def delayStop(stopDelaySeconds: Long): Unit = { + + SignalUtils.register("INT") { + proceedWithStop(s"received SIGINT") + true + } + + logInfo(s"Delaying stop of Spark web UI at $webUrl " + + s"for $stopDelaySeconds seconds as set in spark.ui.stopDelay. " + + s"Stopping can be resumed immediately in the Spark UI, or by sending SIGINT.") + + try Thread.sleep(1000 * stopDelaySeconds) + catch { case _: InterruptedException => } + + stopDelayThread.set(null) + Thread.interrupted() + } + + def proceedWithStop(reason: String): Unit = { + Option(stopDelayThread.get).foreach { stopThread => + logInfo(s"Proceeding to stop Spark web UI ($reason)") + stopThread.interrupt() + } } def getSparkUI(appId: String): Option[SparkUI] = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 18be0870746e..2cea9ed397e2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -311,6 +311,24 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val summary: NodeSeq =
spark.ui.stopDelayspark.ui.enabled