Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,7 @@ private[history] abstract class ApplicationHistoryProvider {
@throws(classOf[SparkException])
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit

@throws(classOf[NoSuchElementException])
def isCompleted(appId: String, attemptId: Option[String]): Boolean

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

private val isAppCompleted = new mutable.HashMap[String, Boolean]()

/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
Expand Down Expand Up @@ -537,6 +539,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
isAppCompleted.put(logPath.getName(), appCompleted)

// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
Expand Down Expand Up @@ -678,6 +681,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
method.invoke(dfs, action).asInstanceOf[Boolean]
}

def isCompleted(appId: String, attemptId: Option[String]): Boolean = {

val name = appId + attemptId.map { id => s"_$id" }.getOrElse("")
if (isAppCompleted.keySet.contains(name)) {
true
} else if (isAppCompleted.contains(name + EventLoggingListener.IN_PROGRESS)) {
false
} else {
throw new NoSuchElementException(s"no app with key $appId/$attemptId.")
}
}

}

private[history] object FsHistoryProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils}

import scala.collection.mutable

/**
* A web server that renders SparkUIs of completed applications.
*
Expand All @@ -50,6 +52,8 @@ class HistoryServer(
port: Int)
extends WebUI(securityManager, port, conf) with Logging with UIRoot {

private val loadedAppStatus = new mutable.HashMap[String, Boolean]()

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)

Expand Down Expand Up @@ -103,7 +107,7 @@ class HistoryServer(
// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI() + "/ui"))
}

// SPARK-5983 ensure TRACE is not supported
Expand Down Expand Up @@ -189,17 +193,19 @@ class HistoryServer(
def getProviderConfig(): Map[String, String] = provider.getConfig()

private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = {
try {
appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse(""))
true
} catch {
case e: Exception => e.getCause() match {
case nsee: NoSuchElementException =>
false

case cause: Exception => throw cause
}
val app_attempt_id = appId + attemptId.map { id => s"/$id" }.getOrElse("")
loadedAppStatus.get(app_attempt_id) match {
case None =>
loadedAppStatus.put(app_attempt_id, provider.isCompleted(appId, attemptId))
appCache.refresh(app_attempt_id)

case Some(false) =>
loadedAppStatus.update(app_attempt_id, provider.isCompleted(appId, attemptId))
appCache.refresh(app_attempt_id)

case Some(true) => // attempt has completed
}
true
}

}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ private[spark] class SparkUI private (
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
if (sc.isDefined) {
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
} else {
attachHandler(createRedirectHandler("/ui", "/jobs", basePath = basePath))
}
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
Expand Down