diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5f5e0fe1c34d..d60314c7608f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -73,4 +73,7 @@ private[history] abstract class ApplicationHistoryProvider { @throws(classOf[SparkException]) def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit + @throws(classOf[NoSuchElementException]) + def isCompleted(appId: String, attemptId: Option[String]): Boolean + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 718efc4f3bd5..4f9de59cb3fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -89,6 +89,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val isAppCompleted = new mutable.HashMap[String, Boolean]() + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -537,6 +539,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) + isAppCompleted.put(logPath.getName(), appCompleted) // Without an app ID, new logs will render incorrectly in the listing page, so do not list or // try to show their UI. Some old versions of Spark generate logs without an app ID, so let @@ -678,6 +681,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) method.invoke(dfs, action).asInstanceOf[Boolean] } + def isCompleted(appId: String, attemptId: Option[String]): Boolean = { + + val name = appId + attemptId.map { id => s"_$id" }.getOrElse("") + if (isAppCompleted.keySet.contains(name)) { + true + } else if (isAppCompleted.contains(name + EventLoggingListener.IN_PROGRESS)) { + false + } else { + throw new NoSuchElementException(s"no app with key $appId/$attemptId.") + } + } + } private[history] object FsHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d4f327cc588f..3549fb007118 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -32,6 +32,8 @@ import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils} +import scala.collection.mutable + /** * A web server that renders SparkUIs of completed applications. * @@ -50,6 +52,8 @@ class HistoryServer( port: Int) extends WebUI(securityManager, port, conf) with Logging with UIRoot { + private val loadedAppStatus = new mutable.HashMap[String, Boolean]() + // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) @@ -103,7 +107,7 @@ class HistoryServer( // Note we don't use the UI retrieved from the cache; the cache loader above will register // the app's UI, and all we need to do is redirect the user to the same URI that was // requested, and the proper data should be served at that point. - res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) + res.sendRedirect(res.encodeRedirectURL(req.getRequestURI() + "/ui")) } // SPARK-5983 ensure TRACE is not supported @@ -189,17 +193,19 @@ class HistoryServer( def getProviderConfig(): Map[String, String] = provider.getConfig() private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { - try { - appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse("")) - true - } catch { - case e: Exception => e.getCause() match { - case nsee: NoSuchElementException => - false - - case cause: Exception => throw cause - } + val app_attempt_id = appId + attemptId.map { id => s"/$id" }.getOrElse("") + loadedAppStatus.get(app_attempt_id) match { + case None => + loadedAppStatus.put(app_attempt_id, provider.isCompleted(appId, attemptId)) + appCache.refresh(app_attempt_id) + + case Some(false) => + loadedAppStatus.update(app_attempt_id, provider.isCompleted(appId, attemptId)) + appCache.refresh(app_attempt_id) + + case Some(true) => // attempt has completed } + true } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 4608bce202ec..fff0ca7a6bbb 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -66,7 +66,11 @@ private[spark] class SparkUI private ( attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) + if (sc.isDefined) { + attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) + } else { + attachHandler(createRedirectHandler("/ui", "/jobs", basePath = basePath)) + } attachHandler(ApiRootResource.getServletHandler(this)) // This should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler(