diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5f5e0fe1c34d..ab462684a91a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -73,4 +73,6 @@ private[history] abstract class ApplicationHistoryProvider { @throws(classOf[SparkException]) def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit + def getAppStatus(appid: String): Boolean + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5427a88f32ff..20c70742312c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -83,6 +83,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val appStatus = new mutable.HashMap[String, Boolean]() + // Constants used to parse Spark 1.0.0 log directories. private[history] val LOG_PREFIX = "EVENT_LOG_" private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" @@ -445,6 +447,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) + appStatus.put(logPath.getName(), appCompleted) new FsApplicationAttemptInfo( logPath.getName(), appListener.appName.getOrElse(NOT_STARTED), @@ -529,6 +532,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + def getAppStatus(appid: String): Boolean = { + if (appStatus.keySet.contains(appid)) { + return true + } + else if (appStatus.contains(appid + EventLoggingListener.IN_PROGRESS)) { + return false + } + else { + val e = new NoSuchElementException(s"no app with key $appid.") + e.initCause(new NoSuchElementException) + throw e + } + } + } private object FsHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 10638afb7490..4942a9093f80 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -32,6 +32,8 @@ import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{SignalLogger, Utils} +import scala.collection.mutable + /** * A web server that renders SparkUIs of completed applications. * @@ -50,6 +52,8 @@ class HistoryServer( port: Int) extends WebUI(securityManager, port, conf) with Logging with UIRoot { + private val loadedAppStatus = new mutable.HashMap[String, Boolean]() + // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) @@ -103,7 +107,7 @@ class HistoryServer( // Note we don't use the UI retrieved from the cache; the cache loader above will register // the app's UI, and all we need to do is redirect the user to the same URI that was // requested, and the proper data should be served at that point. - res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) + res.sendRedirect(res.encodeRedirectURL(req.getRequestURI() + "/ui")) } // SPARK-5983 ensure TRACE is not supported @@ -190,7 +194,14 @@ class HistoryServer( private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse("")) + val app_attemp_id = appId + attemptId.map { id => s"_$id" }.getOrElse("") + if (!loadedAppStatus.get(app_attemp_id).isDefined) { + loadedAppStatus.put(app_attemp_id, provider.getAppStatus(app_attemp_id)) + appCache.refresh(appId + attemptId.map { id => s"/$id" }.getOrElse("")) + } else if (!loadedAppStatus.get(app_attemp_id).get) { + loadedAppStatus.update(app_attemp_id, provider.getAppStatus(app_attemp_id)) + appCache.refresh(appId + attemptId.map { id => s"/$id" }.getOrElse("")) + } true } catch { case e: Exception => e.getCause() match { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 3788916cf39b..ed4648bf579d 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -64,7 +64,11 @@ private[spark] class SparkUI private ( attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) + if (sc.isDefined) { + attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) + } else { + attachHandler(createRedirectHandler("/ui", "/jobs", basePath = basePath)) + } attachHandler(ApiRootResource.getServletHandler(this)) // This should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler(