diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f25d346e6e54..861c02858a5bd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.status.AppStatusStore import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -213,6 +214,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _statusStore: AppStatusStore = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -421,6 +423,10 @@ class SparkContext(config: SparkConf) extends Logging { _jobProgressListener = new JobProgressListener(_conf) listenerBus.addToStatusQueue(jobProgressListener) + // Initialize the app status store and listener before SparkEnv is created so that it gets + // all events. + _statusStore = AppStatusStore.createLiveStore(conf, listenerBus) + // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) @@ -442,8 +448,12 @@ class SparkContext(config: SparkConf) extends Logging { _ui = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener, - _env.securityManager, appName, startTime = startTime)) + Some(SparkUI.create(Some(this), _statusStore, _conf, + l => listenerBus.addToStatusQueue(l), + _env.securityManager, + appName, + "", + startTime)) } else { // For tests, do not enable the UI None @@ -1939,6 +1949,9 @@ class SparkContext(config: SparkConf) extends Logging { } SparkEnv.set(null) } + if (_statusStore != null) { + _statusStore.close() + } // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this // `SparkContext` is stopped. localProperties.remove() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index a370526c46f3d..8c63fa65b40fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -18,14 +18,15 @@ package org.apache.spark.deploy.history import java.util.NoSuchElementException +import java.util.concurrent.ExecutionException import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import com.codahale.metrics.{Counter, MetricRegistry, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} +import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.FilterHolder import org.apache.spark.internal.Logging @@ -34,18 +35,11 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Clock /** - * Cache for applications. + * Cache for application UIs. * - * Completed applications are cached for as long as there is capacity for them. - * Incompleted applications have their update time checked on every - * retrieval; if the cached entry is out of date, it is refreshed. + * Applications are cached for as long as there is capacity for them. See [[LoadedAppUI]] for a + * discussion of the UI lifecycle. * - * @note there must be only one instance of [[ApplicationCache]] in a - * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]] - * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs - * can probe the current cache to see if the attempts have changed. - * - * Creating multiple instances will break this routing. * @param operations implementation of record access operations * @param retainedApplications number of retained applications * @param clock time source @@ -55,9 +49,6 @@ private[history] class ApplicationCache( val retainedApplications: Int, val clock: Clock) extends Logging { - /** - * Services the load request from the cache. - */ private val appLoader = new CacheLoader[CacheKey, CacheEntry] { /** the cache key doesn't match a cached entry, or the entry is out-of-date, so load it. */ @@ -67,9 +58,6 @@ private[history] class ApplicationCache( } - /** - * Handler for callbacks from the cache of entry removal. - */ private val removalListener = new RemovalListener[CacheKey, CacheEntry] { /** @@ -80,16 +68,11 @@ private[history] class ApplicationCache( metrics.evictionCount.inc() val key = rm.getKey logDebug(s"Evicting entry ${key}") - operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui) + operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui) } } - /** - * The cache of applications. - * - * Tagged as `protected` so as to allow subclasses in tests to access it directly - */ - protected val appCache: LoadingCache[CacheKey, CacheEntry] = { + private val appCache: LoadingCache[CacheKey, CacheEntry] = { CacheBuilder.newBuilder() .maximumSize(retainedApplications) .removalListener(removalListener) @@ -101,151 +84,51 @@ private[history] class ApplicationCache( */ val metrics = new CacheMetrics("history.cache") - init() - - /** - * Perform any startup operations. - * - * This includes declaring this instance as the cache to use in the - * [[ApplicationCacheCheckFilterRelay]]. - */ - private def init(): Unit = { - ApplicationCacheCheckFilterRelay.setApplicationCache(this) - } - - /** - * Stop the cache. - * This will reset the relay in [[ApplicationCacheCheckFilterRelay]]. - */ - def stop(): Unit = { - ApplicationCacheCheckFilterRelay.resetApplicationCache() - } - - /** - * Get an entry. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def get(appAndAttempt: String): SparkUI = { - val parts = splitAppAndAttemptKey(appAndAttempt) - get(parts._1, parts._2) - } - - /** - * Get the Spark UI, converting a lookup failure from an exception to `None`. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def getSparkUI(appAndAttempt: String): Option[SparkUI] = { + def get(appId: String, attemptId: Option[String] = None): CacheEntry = { try { - val ui = get(appAndAttempt) - Some(ui) + appCache.get(new CacheKey(appId, attemptId)) } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - None - case cause: Exception => throw cause - } + case e @ (_: ExecutionException | _: UncheckedExecutionException) => + throw Option(e.getCause()).getOrElse(e) } } /** - * Get the associated spark UI. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the entry + * Run a closure while holding an application's UI read lock. This prevents the history server + * from closing the UI data store while it's being used. */ - def get(appId: String, attemptId: Option[String]): SparkUI = { - lookupAndUpdate(appId, attemptId)._1.ui - } + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + var entry = get(appId, attemptId) - /** - * Look up the entry; update it if needed. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the underlying cache entry -which can have its timestamp changed, and a flag to - * indicate that the entry has changed - */ - private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = { - metrics.lookupCount.inc() - val cacheKey = CacheKey(appId, attemptId) - var entry = appCache.getIfPresent(cacheKey) - var updated = false - if (entry == null) { - // no entry, so fetch without any post-fetch probes for out-of-dateness - // this will trigger a callback to loadApplicationEntry() - entry = appCache.get(cacheKey) - } else if (!entry.completed) { - val now = clock.getTimeMillis() - log.debug(s"Probing at time $now for updated application $cacheKey -> $entry") - metrics.updateProbeCount.inc() - updated = time(metrics.updateProbeTimer) { - entry.updateProbe() + // If the entry exists, we need to make sure we run the closure with a valid entry. So + // we need to re-try until we can lock a valid entry for read. + entry.loadedUI.lock.readLock().lock() + try { + while (!entry.loadedUI.valid) { + entry.loadedUI.lock.readLock().unlock() + entry = null + try { + invalidate(new CacheKey(appId, attemptId)) + entry = get(appId, attemptId) + metrics.loadCount.inc() + } finally { + if (entry != null) { + entry.loadedUI.lock.readLock().lock() + } + } } - if (updated) { - logDebug(s"refreshing $cacheKey") - metrics.updateTriggeredCount.inc() - appCache.refresh(cacheKey) - // and repeat the lookup - entry = appCache.get(cacheKey) - } else { - // update the probe timestamp to the current time - entry.probeTime = now + + fn(entry.loadedUI.ui) + } finally { + if (entry != null) { + entry.loadedUI.lock.readLock().unlock() } } - (entry, updated) } - /** - * This method is visible for testing. - * - * It looks up the cached entry *and returns a clone of it*. - * This ensures that the cached entries never leak - * @param appId application ID - * @param attemptId optional attempt ID - * @return a new entry with shared SparkUI, but copies of the other fields. - */ - def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = { - val entry = lookupAndUpdate(appId, attemptId)._1 - new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime) - } - - /** - * Probe for an application being updated. - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update has been triggered - */ - def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = { - val (entry, updated) = lookupAndUpdate(appId, attemptId) - updated - } - - /** - * Size probe, primarily for testing. - * @return size - */ + /** @return Number of cached UIs. */ def size(): Long = appCache.size() - /** - * Emptiness predicate, primarily for testing. - * @return true if the cache is empty - */ - def isEmpty: Boolean = appCache.size() == 0 - - /** - * Time a closure, returning its output. - * @param t timer - * @param f function - * @tparam T type of return value of time - * @return the result of the function. - */ private def time[T](t: Timer)(f: => T): T = { val timeCtx = t.time() try { @@ -272,27 +155,15 @@ private[history] class ApplicationCache( * @throws NoSuchElementException if there is no matching element */ @throws[NoSuchElementException] - def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { - + private def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { logDebug(s"Loading application Entry $appId/$attemptId") metrics.loadCount.inc() - time(metrics.loadTimer) { + val loadedUI = time(metrics.loadTimer) { + metrics.lookupCount.inc() operations.getAppUI(appId, attemptId) match { - case Some(LoadedAppUI(ui, updateState)) => - val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed) - if (completed) { - // completed spark UIs are attached directly - operations.attachSparkUI(appId, attemptId, ui, completed) - } else { - // incomplete UIs have the cache-check filter put in front of them. - ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId) - operations.attachSparkUI(appId, attemptId, ui, completed) - } - // build the cache entry - val now = clock.getTimeMillis() - val entry = new CacheEntry(ui, completed, updateState, now) - logDebug(s"Loaded application $appId/$attemptId -> $entry") - entry + case Some(loadedUI) => + logDebug(s"Loaded application $appId/$attemptId") + loadedUI case None => metrics.lookupFailureCount.inc() // guava's cache logs via java.util log, so is of limited use. Hence: our own message @@ -301,32 +172,20 @@ private[history] class ApplicationCache( attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id")) } } - } - - /** - * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces. - * - * @param appAndAttempt combined key - * @return a tuple of the application ID and, if present, the attemptID - */ - def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = { - val parts = appAndAttempt.split("/") - require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt") - val appId = parts(0) - val attemptId = if (parts.length > 1) Some(parts(1)) else None - (appId, attemptId) - } - - /** - * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`. - * - * If there is an `attemptId`; `applicationId` if not. - * @param appId application ID - * @param attemptId optional attempt ID - * @return a unified string - */ - def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = { - appId + attemptId.map { id => s"/$id" }.getOrElse("") + try { + val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed) + if (!completed) { + // incomplete UIs have the cache-check filter put in front of them. + registerFilter(new CacheKey(appId, attemptId), loadedUI) + } + operations.attachSparkUI(appId, attemptId, loadedUI.ui, completed) + new CacheEntry(loadedUI, completed) + } catch { + case e: Exception => + logWarning(s"Failed to initialize application UI for $appId/$attemptId", e) + operations.detachSparkUI(appId, attemptId, loadedUI.ui) + throw e + } } /** @@ -347,6 +206,26 @@ private[history] class ApplicationCache( sb.append("----\n") sb.toString() } + + /** + * Register a filter for the web UI which checks for updates to the given app/attempt + * @param ui Spark UI to attach filters to + * @param appId application ID + * @param attemptId attempt ID + */ + private def registerFilter(key: CacheKey, loadedUI: LoadedAppUI): Unit = { + require(loadedUI != null) + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) + val filter = new ApplicationCacheCheckFilter(key, loadedUI, this) + val holder = new FilterHolder(filter) + require(loadedUI.ui.getHandlers != null, "null handlers") + loadedUI.ui.getHandlers.foreach { handler => + handler.addFilter(holder, "/*", enumDispatcher) + } + } + + def invalidate(key: CacheKey): Unit = appCache.invalidate(key) + } /** @@ -355,19 +234,14 @@ private[history] class ApplicationCache( * @param ui Spark UI * @param completed Flag to indicated that the application has completed (and so * does not need refreshing). - * @param updateProbe function to call to see if the application has been updated and - * therefore that the cached value needs to be refreshed. - * @param probeTime Times in milliseconds when the probe was last executed. */ private[history] final class CacheEntry( - val ui: SparkUI, - val completed: Boolean, - val updateProbe: () => Boolean, - var probeTime: Long) { + val loadedUI: LoadedAppUI, + val completed: Boolean) { /** string value is for test assertions */ override def toString: String = { - s"UI $ui, completed=$completed, probeTime=$probeTime" + s"UI ${loadedUI.ui}, completed=$completed" } } @@ -396,23 +270,17 @@ private[history] class CacheMetrics(prefix: String) extends Source { val evictionCount = new Counter() val loadCount = new Counter() val loadTimer = new Timer() - val updateProbeCount = new Counter() - val updateProbeTimer = new Timer() - val updateTriggeredCount = new Counter() /** all the counters: for registration and string conversion. */ private val counters = Seq( ("lookup.count", lookupCount), ("lookup.failure.count", lookupFailureCount), ("eviction.count", evictionCount), - ("load.count", loadCount), - ("update.probe.count", updateProbeCount), - ("update.triggered.count", updateTriggeredCount)) + ("load.count", loadCount)) /** all metrics, including timers */ private val allMetrics = counters ++ Seq( - ("load.timer", loadTimer), - ("update.probe.timer", updateProbeTimer)) + ("load.timer", loadTimer)) /** * Name of metric source @@ -498,23 +366,11 @@ private[history] trait ApplicationCacheOperations { * Implementation note: there's some abuse of a shared global entry here because * the configuration data passed to the servlet is just a string:string map. */ -private[history] class ApplicationCacheCheckFilter() extends Filter with Logging { - - import ApplicationCacheCheckFilterRelay._ - var appId: String = _ - var attemptId: Option[String] = _ - - /** - * Bind the app and attempt ID, throwing an exception if no application ID was provided. - * @param filterConfig configuration - */ - override def init(filterConfig: FilterConfig): Unit = { - - appId = Option(filterConfig.getInitParameter(APP_ID)) - .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID")) - attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID)) - logDebug(s"initializing filter $this") - } +private[history] class ApplicationCacheCheckFilter( + key: CacheKey, + loadedUI: LoadedAppUI, + cache: ApplicationCache) + extends Filter with Logging { /** * Filter the request. @@ -543,123 +399,24 @@ private[history] class ApplicationCacheCheckFilter() extends Filter with Logging // if the request is for an attempt, check to see if it is in need of delete/refresh // and have the cache update the UI if so - if (operation=="HEAD" || operation=="GET" - && checkForUpdates(requestURI, appId, attemptId)) { - // send a redirect back to the same location. This will be routed - // to the *new* UI - logInfo(s"Application Attempt $appId/$attemptId updated; refreshing") + loadedUI.lock.readLock().lock() + if (loadedUI.valid) { + try { + chain.doFilter(request, response) + } finally { + loadedUI.lock.readLock.unlock() + } + } else { + loadedUI.lock.readLock.unlock() + cache.invalidate(key) val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) httpResponse.sendRedirect(redirectUrl) - } else { - chain.doFilter(request, response) } } - override def destroy(): Unit = { - } + override def init(config: FilterConfig): Unit = { } - override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId" -} + override def destroy(): Unit = { } -/** - * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache - * probes to the cache. - * - * This is an ugly workaround for the limitation of servlets and filters in the Java servlet - * API; they are still configured on the model of a list of classnames and configuration - * strings in a `web.xml` field, rather than a chain of instances wired up by hand or - * via an injection framework. There is no way to directly configure a servlet filter instance - * with a reference to the application cache which is must use: some global state is needed. - * - * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests - * to the singleton [[ApplicationCache]] - * - * The field `applicationCache` must be set for the filters to work - - * this is done during the construction of [[ApplicationCache]], which requires that there - * is only one cache serving requests through the WebUI. - * - * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic - * will break: filters may not find instances. Tests must not do that. - * - */ -private[history] object ApplicationCacheCheckFilterRelay extends Logging { - // name of the app ID entry in the filter configuration. Mandatory. - val APP_ID = "appId" - - // name of the attempt ID entry in the filter configuration. Optional. - val ATTEMPT_ID = "attemptId" - - // name of the filter to register - val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter" - - /** the application cache to relay requests to */ - @volatile - private var applicationCache: Option[ApplicationCache] = None - - /** - * Set the application cache. Logs a warning if it is overwriting an existing value - * @param cache new cache - */ - def setApplicationCache(cache: ApplicationCache): Unit = { - applicationCache.foreach( c => logWarning(s"Overwriting application cache $c")) - applicationCache = Some(cache) - } - - /** - * Reset the application cache - */ - def resetApplicationCache(): Unit = { - applicationCache = None - } - - /** - * Check to see if there has been an update - * @param requestURI URI the request came in on - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update was loaded for the app/attempt - */ - def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = { - - logDebug(s"Checking $appId/$attemptId from $requestURI") - applicationCache match { - case Some(cache) => - try { - cache.checkForUpdates(appId, attemptId) - } catch { - case ex: Exception => - // something went wrong. Keep going with the existing UI - logWarning(s"When checking for $appId/$attemptId from $requestURI", ex) - false - } - - case None => - logWarning("No application cache instance defined") - false - } - } - - - /** - * Register a filter for the web UI which checks for updates to the given app/attempt - * @param ui Spark UI to attach filters to - * @param appId application ID - * @param attemptId attempt ID - */ - def registerFilter( - ui: SparkUI, - appId: String, - attemptId: Option[String] ): Unit = { - require(ui != null) - val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) - val holder = new FilterHolder() - holder.setClassName(FILTER_NAME) - holder.setInitParameter(APP_ID, appId) - attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id)) - require(ui.getHandlers != null, "null handlers") - ui.getHandlers.foreach { handler => - handler.addFilter(holder, "/*", enumDispatcher) - } - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5cb48ca3e60b0..38f0d6f2afa5e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.history +import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.ZipOutputStream import scala.xml.Node @@ -48,30 +49,44 @@ private[spark] case class ApplicationHistoryInfo( } /** - * A probe which can be invoked to see if a loaded Web UI has been updated. - * The probe is expected to be relative purely to that of the UI returned - * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded, - * the probe returned with it is the one that must be used to check for it - * being out of date; previous probes must be discarded. - */ -private[history] abstract class HistoryUpdateProbe { - /** - * Return true if the history provider has a later version of the application - * attempt than the one against this probe was constructed. - * @return - */ - def isUpdated(): Boolean -} - -/** - * All the information returned from a call to `getAppUI()`: the new UI - * and any required update state. + * A loaded UI for a Spark application. + * + * Loaded UIs are valid once created, and can be invalidated once the history provider detects + * changes in the underlying app data (e.g. an updated event log). Invalidating a UI does not + * unload it; it just signals the [[ApplicationCache]] that the UI should not be used to serve + * new requests. + * + * Reloading of the UI with new data requires collaboration between the cache and the provider; + * the provider invalidates the UI when it detects updated information, and the cache invalidates + * the cache entry when it detects the UI has been invalidated. That will trigger a callback + * on the provider to finally clean up any UI state. The cache should hold read locks when + * using the UI, and the provider should grab the UI's write lock before making destructive + * operations. + * + * Note that all this means that an invalidated UI will still stay in-memory, and any resources it + * references will remain open, until the cache either sees that it's invalidated, or evicts it to + * make room for another UI. + * * @param ui Spark UI - * @param updateProbe probe to call to check on the update state of this application attempt */ -private[history] case class LoadedAppUI( - ui: SparkUI, - updateProbe: () => Boolean) +private[history] case class LoadedAppUI(ui: SparkUI) { + + val lock = new ReentrantReadWriteLock() + + @volatile private var _valid = true + + def valid: Boolean = _valid + + def invalidate(): Unit = { + lock.writeLock().lock() + try { + _valid = false + } finally { + lock.writeLock().unlock() + } + } + +} private[history] abstract class ApplicationHistoryProvider { @@ -145,4 +160,10 @@ private[history] abstract class ApplicationHistoryProvider { * @return html text to display when the application list is empty */ def getEmptyListingHtml(): Seq[Node] = Seq.empty + + /** + * Called when an application UI is unloaded from the history server. + */ + def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index cf97597b484d8..f16dddea9f784 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} -import java.util.{Date, UUID} +import java.util.{Date, ServiceLoader, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -26,8 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node -import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} -import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -42,6 +41,7 @@ import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.{AppStatusListener, AppStatusStore, AppStatusStoreMetadata, KVUtils} import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI @@ -61,9 +61,6 @@ import org.apache.spark.util.kvstore._ * and update or create a matching application info element in the list of applications. * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the * attempt is replaced by another one with a larger log size. - * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]] - * instance is out of date, the log size of the cached instance is checked against the app last - * loaded by [[checkForLogs]]. * * The use of log size, rather than simply relying on modification times, is needed to * address the following issues @@ -125,23 +122,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - private val storePath = conf.get(LOCAL_STORE_DIR) + private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) // Visible for testing. private[history] val listing: KVStore = storePath.map { path => + require(path.isDirectory(), s"Configured store directory ($path) does not exist.") val dbPath = new File(path, "listing.ldb") - val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, logDir.toString()) + val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, + AppStatusStore.CURRENT_VERSION, logDir.toString()) try { open(new File(path, "listing.ldb"), metadata) } catch { + // If there's an error, remove the listing database and any existing UI database + // from the store directory, since it's extremely likely that they'll all contain + // incompatible information. case _: UnsupportedStoreVersionException | _: MetadataMismatchException => logInfo("Detected incompatible DB versions, deleting...") - Utils.deleteRecursively(dbPath) + path.listFiles().foreach(Utils.deleteRecursively) open(new File(path, "listing.ldb"), metadata) } }.getOrElse(new InMemoryStore()) + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -165,7 +169,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - // Conf option used for testing the initialization code. val initThread = initialize() private[history] def initialize(): Thread = { @@ -268,42 +271,100 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { - try { - val appInfo = load(appId) - appInfo.attempts - .find(_.info.attemptId == attemptId) - .map { attempt => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, - HistoryServer.getAttemptURI(appId, attempt.info.attemptId), - Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) - // Do not call ui.bind() to avoid creating a new server for each application - } + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + return None + } + + val attempt = app.attempts.find(_.info.attemptId == attemptId).orNull + if (attempt == null) { + return None + } - val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) - - val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - assert(appListener.appId.isDefined) - ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, - appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)) + val conf = this.conf.clone() + val secManager = new SecurityManager(conf) + + secManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + secManager.setAdminAcls(HISTORY_UI_ADMIN_ACLS + "," + attempt.adminAcls.getOrElse("")) + secManager.setViewAcls(attempt.info.sparkUser, attempt.viewAcls.getOrElse("")) + secManager.setAdminAclsGroups(HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + attempt.adminAclsGroups.getOrElse("")) + secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) + + val replayBus = new ReplayListenerBus() + + val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) } + + val (kvstore, needReplay) = uiStorePath match { + case Some(path) => + try { + val _replay = !path.isDirectory() + (createDiskStore(path, conf), _replay) + } catch { + case e: Exception => + // Get rid of the old data and re-create it. The store is either old or corrupted. + logWarning(s"Failed to load disk store $uiStorePath for $appId.", e) + Utils.deleteRecursively(path) + (createDiskStore(path, conf), true) } + + case _ => + (new InMemoryStore(), true) + } + + val listener = if (needReplay) { + val _listener = new AppStatusListener(kvstore, conf, false) + replayBus.addListener(_listener) + Some(_listener) + } else { + None + } + + val loadedUI = { + val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, + l => replayBus.addListener(l), + secManager, + app.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime(), + appSparkVersion = attempt.info.appSparkVersion) + LoadedAppUI(ui) + } + + try { + val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], + Utils.getContextOrSparkClassLoader).asScala + listenerFactories.foreach { listenerFactory => + val listeners = listenerFactory.createListeners(conf, loadedUI.ui) + listeners.foreach(replayBus.addListener) + } + + val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) + replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) + listener.foreach(_.flush()) } catch { - case _: FileNotFoundException => None - case _: NoSuchElementException => None + case e: Exception => + try { + kvstore.close() + } catch { + case _e: Exception => logInfo("Error closing store.", _e) + } + uiStorePath.foreach(Utils.deleteRecursively) + if (e.isInstanceOf[FileNotFoundException]) { + return None + } else { + throw e + } + } + + synchronized { + activeUIs((appId, attemptId)) = loadedUI } + + Some(loadedUI) } override def getEmptyListingHtml(): Seq[Node] = { @@ -332,11 +393,40 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) initThread.interrupt() initThread.join() } + Seq(pool, replayExecutor).foreach { executor => + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() + } + } } finally { + activeUIs.foreach { case (_, loadedUI) => loadedUI.ui.store.close() } + activeUIs.clear() listing.close() } } + override def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { + val uiOption = synchronized { + activeUIs.remove((appId, attemptId)) + } + uiOption.foreach { loadedUI => + loadedUI.lock.writeLock().lock() + try { + loadedUI.ui.store.close() + } finally { + loadedUI.lock.writeLock().unlock() + } + + // If the UI is not valid, delete its files from disk, if any. This relies on the fact that + // ApplicationCache will never call this method concurrently with getAppUI() for the same + // appId / attemptId. + if (!loadedUI.valid && storePath.isDefined) { + Utils.deleteRecursively(getStorePath(storePath.get, appId, attemptId)) + } + } + } + /** * Builds the application list based on the current contents of the log directory. * Tries to reuse as much of the data already in memory as possible, by not reading @@ -475,7 +565,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) + eventString.startsWith(LOG_START_EVENT_PREFIX) || + eventString.startsWith(ENV_UPDATE_EVENT_PREFIX) } val logPath = fileStatus.getPath() @@ -486,8 +577,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus.addListener(listener) replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) - listener.applicationInfo.foreach(addListing) - listing.write(LogInfo(logPath.toString(), fileStatus.getLen())) + listener.applicationInfo.foreach { app => + // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a + // discussion on the UI lifecycle. + synchronized { + activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui => + ui.invalidate() + ui.ui.store.close() + } + } + + addListing(app) + } + listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } /** @@ -546,16 +648,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns - * an `ApplicationEventListener` instance with event data captured from the replay. - * `ReplayEventsFilter` determines what events are replayed and can therefore limit the - * data captured in the returned `ApplicationEventListener` instance. + * Replays the events in the specified log file on the supplied `ReplayListenerBus`. + * `ReplayEventsFilter` determines what events are replayed. */ private def replay( eventLog: FileStatus, appCompleted: Boolean, bus: ReplayListenerBus, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = { + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, @@ -566,11 +666,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { - val appListener = new ApplicationEventListener - bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) logInfo(s"Finished replaying $logPath") - appListener } finally { logInput.close() } @@ -613,32 +710,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | application count=$count}""".stripMargin } - /** - * Return true iff a newer version of the UI is available. The check is based on whether the - * fileSize for the currently loaded UI is smaller than the file size the last time - * the logs were loaded. - * - * This is a very cheap operation -- the work of loading the new attempt was already done - * by [[checkForLogs]]. - * @param appId application to probe - * @param attemptId attempt to probe - * @param prevFileSize the file size of the logs for the currently displayed UI - */ - private def updateProbe( - appId: String, - attemptId: Option[String], - prevFileSize: Long)(): Boolean = { - try { - val attempt = getAttempt(appId, attemptId) - val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) - recordedFileSize(logPath) > prevFileSize - } catch { - case _: NoSuchElementException => - logDebug(s"Application Attempt $appId/$attemptId not found") - false - } - } - /** * Return the last known size of the given event log, recorded the last time the file * system scanner detected a change in the file. @@ -682,6 +753,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newAppInfo) } + private def createDiskStore(path: File, conf: SparkConf): KVStore = { + val metadata = new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION) + KVUtils.open(path, metadata) + } + + private def getStorePath(path: File, appId: String, attemptId: Option[String]): File = { + val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" + new File(path, fileName) + } + /** For testing. Returns internal data about a single attempt. */ private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( @@ -699,6 +780,8 @@ private[history] object FsHistoryProvider { private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + private val ENV_UPDATE_EVENT_PREFIX = "{\"Event\":\"SparkListenerEnvironmentUpdate\"," + /** * Current version of the data written to the listing database. When opening an existing * db, if the version does not match this value, the FsHistoryProvider will throw away @@ -708,17 +791,22 @@ private[history] object FsHistoryProvider { } private[history] case class FsHistoryProviderMetadata( - version: Long, - logDir: String) + version: Long, + uiVersion: Long, + logDir: String) private[history] case class LogInfo( - @KVIndexParam logPath: String, - fileSize: Long) + @KVIndexParam logPath: String, + fileSize: Long) private[history] class AttemptInfoWrapper( val info: v1.ApplicationAttemptInfo, val logPath: String, - val fileSize: Long) { + val fileSize: Long, + val adminAcls: Option[String], + val viewAcls: Option[String], + val adminAclsGroups: Option[String], + val viewAclsGroups: Option[String]) { def toAppAttemptInfo(): ApplicationAttemptInfo = { ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), @@ -769,6 +857,14 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends attempt.completed = true } + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + val allProperties = event.environmentDetails("Spark Properties").toMap + attempt.viewAcls = allProperties.get("spark.ui.view.acls") + attempt.adminAcls = allProperties.get("spark.admin.acls") + attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") + attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups") + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(sparkVersion) => attempt.appSparkVersion = sparkVersion @@ -809,6 +905,11 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends var completed = false var appSparkVersion = "" + var adminAcls: Option[String] = None + var viewAcls: Option[String] = None + var adminAclsGroups: Option[String] = None + var viewAclsGroups: Option[String] = None + def toView(): AttemptInfoWrapper = { val apiInfo = new v1.ApplicationAttemptInfo( attemptId, @@ -822,7 +923,11 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends new AttemptInfoWrapper( apiInfo, logPath, - fileSize) + fileSize, + adminAcls, + viewAcls, + adminAclsGroups, + viewAclsGroups) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d9c8fda99ef97..b822a48e98e91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -106,8 +106,8 @@ class HistoryServer( } } - def getSparkUI(appKey: String): Option[SparkUI] = { - appCache.getSparkUI(appKey) + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + appCache.withSparkUI(appId, attemptId)(fn) } initialize() @@ -140,7 +140,6 @@ class HistoryServer( override def stop() { super.stop() provider.stop() - appCache.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -158,6 +157,7 @@ class HistoryServer( override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) + provider.onUIDetached(appId, attemptId, ui) } /** @@ -224,15 +224,13 @@ class HistoryServer( */ private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId, attemptId) + appCache.withSparkUI(appId, attemptId) { _ => + // Do nothing, just force the UI to load. + } true } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - false - - case cause: Exception => throw cause - } + case NonFatal(e: NoSuchElementException) => + false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala deleted file mode 100644 index 6da8865cd10d3..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -/** - * A simple listener for application events. - * - * This listener expects to hear events from a single application only. If events - * from multiple applications are seen, the behavior is unspecified. - */ -private[spark] class ApplicationEventListener extends SparkListener { - var appName: Option[String] = None - var appId: Option[String] = None - var appAttemptId: Option[String] = None - var sparkUser: Option[String] = None - var startTime: Option[Long] = None - var endTime: Option[Long] = None - var viewAcls: Option[String] = None - var adminAcls: Option[String] = None - var viewAclsGroups: Option[String] = None - var adminAclsGroups: Option[String] = None - var appSparkVersion: Option[String] = None - - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - appName = Some(applicationStart.appName) - appId = applicationStart.appId - appAttemptId = applicationStart.appAttemptId - startTime = Some(applicationStart.time) - sparkUser = Some(applicationStart.sparkUser) - } - - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - endTime = Some(applicationEnd.time) - } - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - val allProperties = environmentDetails("Spark Properties").toMap - viewAcls = allProperties.get("spark.ui.view.acls") - adminAcls = allProperties.get("spark.admin.acls") - viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") - adminAclsGroups = allProperties.get("spark.admin.acls.groups") - } - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerLogStart(sparkVersion) => - appSparkVersion = Some(sparkVersion) - case _ => - } -} diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f120685c941df..cd43612fae357 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -34,12 +34,22 @@ import org.apache.spark.util.kvstore.KVStore * A Spark listener that writes application information to a data store. The types written to the * store are defined in the `storeTypes.scala` file and are based on the public REST API. */ -private class AppStatusListener(kvstore: KVStore) extends SparkListener with Logging { +private[spark] class AppStatusListener( + kvstore: KVStore, + conf: SparkConf, + live: Boolean) extends SparkListener with Logging { + + import config._ private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null private var coresPerTask: Int = 1 + // How often to update live entities. -1 means "never update" when replaying applications, + // meaning only the last write will happen. For live applications, this avoids a few + // operations that we can live without when rapidly processing incoming task events. + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + // Keep track of live entities, so that task metrics can be efficiently updated (without // causing too many writes to the underlying store, and other expensive operations). private val liveStages = new HashMap[(Int, Int), LiveStage]() @@ -110,13 +120,13 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log exec.totalCores = event.executorInfo.totalCores exec.maxTasks = event.executorInfo.totalCores / coresPerTask exec.executorLogs = event.executorInfo.logUrlMap - update(exec) + liveUpdate(exec, System.nanoTime()) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { liveExecutors.remove(event.executorId).foreach { exec => exec.isActive = false - update(exec) + update(exec, System.nanoTime()) } } @@ -139,21 +149,25 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted - update(exec) + liveUpdate(exec, System.nanoTime()) } } private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = { + val now = System.nanoTime() + // Implicitly (un)blacklist every executor associated with the node. liveExecutors.values.foreach { exec => if (exec.hostname == host) { exec.isBlacklisted = blacklisted - update(exec) + liveUpdate(exec, now) } } } override def onJobStart(event: SparkListenerJobStart): Unit = { + val now = System.nanoTime() + // Compute (a potential over-estimate of) the number of tasks that will be run by this job. // This may be an over-estimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -178,7 +192,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log jobGroup, numTasks) liveJobs.put(event.jobId, job) - update(job) + liveUpdate(job, now) event.stageInfos.foreach { stageInfo => // A new job submission may re-use an existing stage, so this code needs to do an update @@ -186,7 +200,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log val stage = getOrCreateStage(stageInfo) stage.jobs :+= job stage.jobIds += event.jobId - update(stage) + liveUpdate(stage, now) } } @@ -198,11 +212,12 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } job.completionTime = Some(new Date(event.time)) - update(job) + update(job, System.nanoTime()) } } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val now = System.nanoTime() val stage = getOrCreateStage(event.stageInfo) stage.status = v1.StageStatus.ACTIVE stage.schedulingPool = Option(event.properties).flatMap { p => @@ -218,38 +233,39 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log stage.jobs.foreach { job => job.completedStages = job.completedStages - event.stageInfo.stageId job.activeStages += 1 - update(job) + liveUpdate(job, now) } event.stageInfo.rddInfos.foreach { info => if (info.storageLevel.isValid) { - update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info))) + liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now) } } - update(stage) + liveUpdate(stage, now) } override def onTaskStart(event: SparkListenerTaskStart): Unit = { + val now = System.nanoTime() val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId) liveTasks.put(event.taskInfo.taskId, task) - update(task) + liveUpdate(task, now) liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => stage.activeTasks += 1 stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime) - update(stage) + maybeUpdate(stage, now) stage.jobs.foreach { job => job.activeTasks += 1 - update(job) + maybeUpdate(job, now) } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => exec.activeTasks += 1 exec.totalTasks += 1 - update(exec) + maybeUpdate(exec, now) } } @@ -257,7 +273,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log // Call update on the task so that the "getting result" time is written to the store; the // value is part of the mutable TaskInfo state that the live entity already references. liveTasks.get(event.taskInfo.taskId).foreach { task => - update(task) + maybeUpdate(task, System.nanoTime()) } } @@ -267,6 +283,8 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log return } + val now = System.nanoTime() + val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task => val errorMessage = event.reason match { case Success => @@ -283,7 +301,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } task.errorMessage = errorMessage val delta = task.updateMetrics(event.taskMetrics) - update(task) + update(task, now) delta }.orNull @@ -301,13 +319,13 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log stage.activeTasks -= 1 stage.completedTasks += completedDelta stage.failedTasks += failedDelta - update(stage) + maybeUpdate(stage, now) stage.jobs.foreach { job => job.activeTasks -= 1 job.completedTasks += completedDelta job.failedTasks += failedDelta - update(job) + maybeUpdate(job, now) } val esummary = stage.executorSummary(event.taskInfo.executorId) @@ -317,7 +335,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log if (metricsDelta != null) { esummary.metrics.update(metricsDelta) } - update(esummary) + maybeUpdate(esummary, now) } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -333,12 +351,13 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log exec.completedTasks += completedDelta exec.failedTasks += failedDelta exec.totalDuration += event.taskInfo.duration - update(exec) + maybeUpdate(exec, now) } } override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)).foreach { stage => + val now = System.nanoTime() stage.info = event.stageInfo // Because of SPARK-20205, old event logs may contain valid stages without a submission time @@ -349,7 +368,6 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log case _ if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE case _ => v1.StageStatus.SKIPPED } - update(stage) stage.jobs.foreach { job => stage.status match { @@ -362,11 +380,11 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log job.failedStages += 1 } job.activeStages -= 1 - update(job) + liveUpdate(job, now) } - stage.executorSummaries.values.foreach(update) - update(stage) + stage.executorSummaries.values.foreach(update(_, now)) + update(stage, now) } } @@ -381,7 +399,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } exec.isActive = true exec.maxMemory = event.maxMem - update(exec) + liveUpdate(exec, System.nanoTime()) } override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { @@ -394,19 +412,21 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + val now = System.nanoTime() + event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) => liveTasks.get(taskId).foreach { task => val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) val delta = task.updateMetrics(metrics) - update(task) + maybeUpdate(task, now) liveStages.get((sid, sAttempt)).foreach { stage => stage.metrics.update(delta) - update(stage) + maybeUpdate(stage, now) val esummary = stage.executorSummary(event.execId) esummary.metrics.update(delta) - update(esummary) + maybeUpdate(esummary, now) } } } @@ -419,7 +439,18 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } } + /** Flush all live entities' data to the underlying store. */ + def flush(): Unit = { + val now = System.nanoTime() + liveStages.values.foreach(update(_, now)) + liveJobs.values.foreach(update(_, now)) + liveExecutors.values.foreach(update(_, now)) + liveTasks.values.foreach(update(_, now)) + liveRDDs.values.foreach(update(_, now)) + } + private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { + val now = System.nanoTime() val executorId = event.blockUpdatedInfo.blockManagerId.executorId // Whether values are being added to or removed from the existing accounting. @@ -494,7 +525,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta) rdd.diskUsed = newValue(rdd.diskUsed, diskDelta) - update(rdd) + update(rdd, now) } maybeExec.foreach { exec => @@ -508,9 +539,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) exec.diskUsed = newValue(exec.diskUsed, diskDelta) exec.rddBlocks += rddBlocksDelta - if (exec.hasMemoryInfo || rddBlocksDelta != 0) { - update(exec) - } + maybeUpdate(exec, now) } } @@ -524,8 +553,22 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log stage } - private def update(entity: LiveEntity): Unit = { - entity.write(kvstore) + private def update(entity: LiveEntity, now: Long): Unit = { + entity.write(kvstore, now) + } + + /** Update a live entity only if it hasn't been updated in the last configured period. */ + private def maybeUpdate(entity: LiveEntity, now: Long): Unit = { + if (liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + update(entity, now) + } + } + + /** Update an entity only if in a live app; avoids redundant writes when replaying logs. */ + private def liveUpdate(entity: LiveEntity, now: Long): Unit = { + if (live) { + update(entity, now) + } } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala new file mode 100644 index 0000000000000..2927a3227cbef --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Arrays, List => JList} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.status.api.v1 +import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} + +/** + * A wrapper around a KVStore that provides methods for accessing the API data stored within. + */ +private[spark] class AppStatusStore(store: KVStore) { + + def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { + val it = store.view(classOf[JobDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty()) { + it.filter { job => statuses.contains(job.status) }.toSeq + } else { + it.toSeq + } + } + + def job(jobId: Int): v1.JobData = { + store.read(classOf[JobDataWrapper], jobId).info + } + + def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { + store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true) + .last(true).asScala.map(_.info).toSeq + } + + def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { + val it = store.view(classOf[StageDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty) { + it.filter { s => statuses.contains(s.status) }.toSeq + } else { + it.toSeq + } + } + + def stageData(stageId: Int): Seq[v1.StageData] = { + store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId) + .asScala.map(_.info).toSeq + } + + def stageAttempt(stageId: Int, stageAttemptId: Int): v1.StageData = { + store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).info + } + + def taskSummary( + stageId: Int, + stageAttemptId: Int, + quantiles: Array[Double]): v1.TaskMetricDistributions = { + + val stage = Array(stageId, stageAttemptId) + + val rawMetrics = store.view(classOf[TaskDataWrapper]) + .index("stage") + .first(stage) + .last(stage) + .asScala + .flatMap(_.info.taskMetrics) + .toList + .view + + def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = + Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) + + // We need to do a lot of similar munging to nested metrics here. For each one, + // we want (a) extract the values for nested metrics (b) make a distribution for each metric + // (c) shove the distribution into the right field in our return type and (d) only return + // a result if the option is defined for any of the tasks. MetricHelper is a little util + // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just + // implement one "build" method, which just builds the quantiles for each field. + + val inputMetrics = + new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics + + def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( + bytesRead = submetricQuantiles(_.bytesRead), + recordsRead = submetricQuantiles(_.recordsRead) + ) + }.build + + val outputMetrics = + new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics + + def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( + bytesWritten = submetricQuantiles(_.bytesWritten), + recordsWritten = submetricQuantiles(_.recordsWritten) + ) + }.build + + val shuffleReadMetrics = + new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = + raw.shuffleReadMetrics + + def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( + readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, + readRecords = submetricQuantiles(_.recordsRead), + remoteBytesRead = submetricQuantiles(_.remoteBytesRead), + remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk), + remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), + localBlocksFetched = submetricQuantiles(_.localBlocksFetched), + totalBlocksFetched = submetricQuantiles { s => + s.localBlocksFetched + s.remoteBlocksFetched + }, + fetchWaitTime = submetricQuantiles(_.fetchWaitTime) + ) + }.build + + val shuffleWriteMetrics = + new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = + raw.shuffleWriteMetrics + + def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( + writeBytes = submetricQuantiles(_.bytesWritten), + writeRecords = submetricQuantiles(_.recordsWritten), + writeTime = submetricQuantiles(_.writeTime) + ) + }.build + + new v1.TaskMetricDistributions( + quantiles = quantiles, + executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), + executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime), + executorRunTime = metricQuantiles(_.executorRunTime), + executorCpuTime = metricQuantiles(_.executorCpuTime), + resultSize = metricQuantiles(_.resultSize), + jvmGcTime = metricQuantiles(_.jvmGcTime), + resultSerializationTime = metricQuantiles(_.resultSerializationTime), + memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), + diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), + inputMetrics = inputMetrics, + outputMetrics = outputMetrics, + shuffleReadMetrics = shuffleReadMetrics, + shuffleWriteMetrics = shuffleWriteMetrics + ) + } + + def taskList( + stageId: Int, + stageAttemptId: Int, + offset: Int, + length: Int, + sortBy: v1.TaskSorting): Seq[v1.TaskData] = { + val stageKey = Array(stageId, stageAttemptId) + val base = store.view(classOf[TaskDataWrapper]) + val indexed = sortBy match { + case v1.TaskSorting.ID => + base.index("stage").first(stageKey).last(stageKey) + case v1.TaskSorting.INCREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(-1L)).last(stageKey ++ Array(Long.MaxValue)) + case v1.TaskSorting.DECREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).last(stageKey ++ Array(-1L)) + .reverse() + } + indexed.skip(offset).max(length).asScala.map(_.info).toSeq + } + + def rddList(): Seq[v1.RDDStorageInfo] = { + store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).toSeq + } + + def rdd(rddId: Int): v1.RDDStorageInfo = { + store.read(classOf[RDDStorageInfoWrapper], rddId).info + } + + def close(): Unit = { + store.close() + } + +} + +private[spark] object AppStatusStore { + + val CURRENT_VERSION = 1L + + /** + * Create an in-memory store for a live application. + * + * @param conf Configuration. + * @param bus Where to attach the listener to populate the store. + */ + def createLiveStore(conf: SparkConf, bus: LiveListenerBus): AppStatusStore = { + val store = new InMemoryStore() + val stateStore = new AppStatusStore(store) + bus.addToStatusQueue(new AppStatusListener(store, conf, true)) + stateStore + } + +} + +/** + * Helper for getting distributions from nested metric types. + */ +private abstract class MetricHelper[I, O]( + rawMetrics: Seq[v1.TaskMetrics], + quantiles: Array[Double]) { + + def getSubmetrics(raw: v1.TaskMetrics): I + + def build: O + + val data: Seq[I] = rawMetrics.map(getSubmetrics) + + /** applies the given function to all input metrics, and returns the quantiles */ + def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { + Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 63fa36580bc7d..041dfe1ef915e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -37,8 +37,11 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] abstract class LiveEntity { - def write(store: KVStore): Unit = { + var lastWriteTime = 0L + + def write(store: KVStore, now: Long): Unit = { store.write(doUpdate()) + lastWriteTime = now } /** @@ -204,7 +207,7 @@ private class LiveTask( newAccumulatorInfos(info.accumulables), errorMessage, Option(recordedMetrics)) - new TaskDataWrapper(task) + new TaskDataWrapper(task, stageId, stageAttemptId) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index f17b637754826..9d3833086172f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -248,7 +248,13 @@ private[spark] object ApiRootResource { * interface needed for them all to expose application info as json. */ private[spark] trait UIRoot { - def getSparkUI(appKey: String): Option[SparkUI] + /** + * Runs some code with the current SparkUI instance for the app / attempt. + * + * @throws NoSuchElementException If the app / attempt pair does not exist. + */ + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T + def getApplicationInfoList: Iterator[ApplicationInfo] def getApplicationInfo(appId: String): Option[ApplicationInfo] @@ -293,15 +299,18 @@ private[v1] trait ApiRequestContext { * to it. If there is no such app, throw an appropriate exception */ def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) - uiRoot.getSparkUI(appKey) match { - case Some(ui) => + try { + uiRoot.withSparkUI(appId, attemptId) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { throw new ForbiddenException(raw"""user "$user" is not authorized""") } f(ui) - case None => throw new NotFoundException("no such app: " + appId) + } + } catch { + case _: NoSuchElementException => + val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + throw new NotFoundException(s"no such app: $appKey") } } } diff --git a/core/src/main/scala/org/apache/spark/status/config.scala b/core/src/main/scala/org/apache/spark/status/config.scala new file mode 100644 index 0000000000000..49144fc883e69 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/config.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config._ + +private[spark] object config { + + val LIVE_ENTITY_UPDATE_PERIOD = ConfigBuilder("spark.ui.liveUpdate.period") + .timeConf(TimeUnit.NANOSECONDS) + .createWithDefaultString("100ms") + +} diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9579accd2cba7..a445435809f3a 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -17,12 +17,16 @@ package org.apache.spark.status +import java.lang.{Integer => JInteger, Long => JLong} + import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ import org.apache.spark.util.kvstore.KVIndex +private[spark] case class AppStatusStoreMetadata(version: Long) + private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { @JsonIgnore @KVIndex @@ -64,13 +68,33 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex def id: Array[Int] = Array(info.stageId, info.attemptId) + @JsonIgnore @KVIndex("stageId") + def stageId: Int = info.stageId + } -private[spark] class TaskDataWrapper(val info: TaskData) { +/** + * The task information is always indexed with the stage ID, since that is how the UI and API + * consume it. That means every indexed value has the stage ID and attempt ID included, aside + * from the actual data being indexed. + */ +private[spark] class TaskDataWrapper( + val info: TaskData, + val stageId: Int, + val stageAttemptId: Int) { @JsonIgnore @KVIndex def id: Long = info.taskId + @JsonIgnore @KVIndex("stage") + def stage: Array[Int] = Array(stageId, stageAttemptId) + + @JsonIgnore @KVIndex("runtime") + def runtime: Array[AnyRef] = { + val _runtime = info.taskMetrics.map(_.executorRunTime).getOrElse(-1L) + Array(stageId: JInteger, stageAttemptId: JInteger, _runtime: JLong) + } + } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6e94073238a56..ee645f6bf8a7a 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, UIRoot} import org.apache.spark.storage.StorageStatusListener @@ -39,6 +40,7 @@ import org.apache.spark.util.Utils * Top level user interface for a Spark application. */ private[spark] class SparkUI private ( + val store: AppStatusStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, @@ -51,7 +53,8 @@ private[spark] class SparkUI private ( var appName: String, val basePath: String, val lastUpdateTime: Option[Long] = None, - val startTime: Long) + val startTime: Long, + val appSparkVersion: String) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging @@ -61,8 +64,6 @@ private[spark] class SparkUI private ( var appId: String = _ - var appSparkVersion = org.apache.spark.SPARK_VERSION - private var streamingJobProgressListener: Option[SparkListener] = None /** Initialize all components of the server. */ @@ -104,8 +105,12 @@ private[spark] class SparkUI private ( logInfo(s"Stopped Spark web UI at $webUrl") } - def getSparkUI(appId: String): Option[SparkUI] = { - if (appId == this.appId) Some(this) else None + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + if (appId == this.appId) { + fn(this) + } else { + throw new NoSuchElementException() + } } def getApplicationInfoList: Iterator[ApplicationInfo] = { @@ -159,63 +164,26 @@ private[spark] object SparkUI { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } - def createLiveUI( - sc: SparkContext, - conf: SparkConf, - jobProgressListener: JobProgressListener, - securityManager: SecurityManager, - appName: String, - startTime: Long): SparkUI = { - create(Some(sc), conf, - sc.listenerBus.addToStatusQueue, - securityManager, appName, jobProgressListener = Some(jobProgressListener), - startTime = startTime) - } - - def createHistoryUI( - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String, - lastUpdateTime: Option[Long], - startTime: Long): SparkUI = { - val sparkUI = create(None, conf, listenerBus.addListener, securityManager, appName, basePath, - lastUpdateTime = lastUpdateTime, startTime = startTime) - - val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], - Utils.getContextOrSparkClassLoader).asScala - listenerFactories.foreach { listenerFactory => - val listeners = listenerFactory.createListeners(conf, sparkUI) - listeners.foreach(listenerBus.addListener) - } - sparkUI - } - /** - * Create a new Spark UI. - * - * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs. - * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the - * web UI will create and register its own JobProgressListener. + * Create a new UI backed by an AppStatusStore. */ - private def create( + def create( sc: Option[SparkContext], + store: AppStatusStore, conf: SparkConf, addListenerFn: SparkListenerInterface => Unit, securityManager: SecurityManager, appName: String, - basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None, + basePath: String, + startTime: Long, lastUpdateTime: Option[Long] = None, - startTime: Long): SparkUI = { + appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = { - val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { + val jobProgressListener = sc.map(_.jobProgressListener).getOrElse { val listener = new JobProgressListener(conf) addListenerFn(listener) listener } - val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) @@ -228,8 +196,9 @@ private[spark] object SparkUI { addListenerFn(storageListener) addListenerFn(operationGraphListener) - new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, lastUpdateTime, startTime) + new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, jobProgressListener, storageListener, operationGraphListener, + appName, basePath, lastUpdateTime, startTime, appSparkVersion) } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 6e50e84549047..44f9c566a380d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -18,15 +18,11 @@ package org.apache.spark.deploy.history import java.util.{Date, NoSuchElementException} -import javax.servlet.Filter import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.mutable -import scala.collection.mutable.ListBuffer import com.codahale.metrics.Counter -import com.google.common.cache.LoadingCache -import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.ServletContextHandler import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -39,23 +35,10 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.ManualClock class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers { - /** - * subclass with access to the cache internals - * @param retainedApplications number of retained applications - */ - class TestApplicationCache( - operations: ApplicationCacheOperations = new StubCacheOperations(), - retainedApplications: Int, - clock: Clock = new ManualClock(0)) - extends ApplicationCache(operations, retainedApplications, clock) { - - def cache(): LoadingCache[CacheKey, CacheEntry] = appCache - } - /** * Stub cache operations. * The state is kept in a map of [[CacheKey]] to [[CacheEntry]], @@ -77,8 +60,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { logDebug(s"getAppUI($appId, $attemptId)") getAppUICount += 1 - instances.get(CacheKey(appId, attemptId)).map( e => - LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime))) + instances.get(CacheKey(appId, attemptId)).map { e => e.loadedUI } } override def attachSparkUI( @@ -96,10 +78,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp) - attachSparkUI(appId, attemptId, ui, completed) + ended: Long): LoadedAppUI = { + val ui = putAppUI(appId, attemptId, completed, started, ended) + attachSparkUI(appId, attemptId, ui.ui, completed) ui } @@ -108,23 +89,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = newUI(appId, attemptId, completed, started, ended) - putInstance(appId, attemptId, ui, completed, timestamp) + ended: Long): LoadedAppUI = { + val ui = LoadedAppUI(newUI(appId, attemptId, completed, started, ended)) + instances(CacheKey(appId, attemptId)) = new CacheEntry(ui, completed) ui } - def putInstance( - appId: String, - attemptId: Option[String], - ui: SparkUI, - completed: Boolean, - timestamp: Long): Unit = { - instances += (CacheKey(appId, attemptId) -> - new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp)) - } - /** * Detach a reconstructed UI * @@ -146,23 +116,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attached.get(CacheKey(appId, attemptId)) } - /** - * The update probe. - * @param appId application to probe - * @param attemptId attempt to probe - * @param updateTime timestamp of this UI load - */ - private[history] def updateProbe( - appId: String, - attemptId: Option[String], - updateTime: Long)(): Boolean = { - updateProbeCount += 1 - logDebug(s"isUpdated($appId, $attemptId, ${updateTime})") - val entry = instances.get(CacheKey(appId, attemptId)).get - val updated = entry.probeTime > updateTime - logDebug(s"entry = $entry; updated = $updated") - updated - } } /** @@ -210,15 +163,13 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val now = clock.getTimeMillis() // add the entry - operations.putAppUI(app1, None, true, now, now, now) + operations.putAppUI(app1, None, true, now, now) // make sure its local operations.getAppUI(app1, None).get operations.getAppUICount = 0 // now expect it to be found - val cacheEntry = cache.lookupCacheEntry(app1, None) - assert(1 === cacheEntry.probeTime) - assert(cacheEntry.completed) + cache.withSparkUI(app1, None) { _ => } // assert about queries made of the operations assert(1 === operations.getAppUICount, "getAppUICount") assert(1 === operations.attachCount, "attachCount") @@ -236,8 +187,8 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar assert(0 === operations.detachCount, "attachCount") // evict the entry - operations.putAndAttach("2", None, true, time2, time2, time2) - operations.putAndAttach("3", None, true, time2, time2, time2) + operations.putAndAttach("2", None, true, time2, time2) + operations.putAndAttach("3", None, true, time2, time2) cache.get("2") cache.get("3") @@ -248,7 +199,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val appId = "app1" val attemptId = Some("_01") val time3 = clock.getTimeMillis() - operations.putAppUI(appId, attemptId, false, time3, 0, time3) + operations.putAppUI(appId, attemptId, false, time3, 0) // expect an error here assertNotFound(appId, None) } @@ -256,10 +207,11 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Test that if an attempt ID is set, it must be used in lookups") { val operations = new StubCacheOperations() val clock = new ManualClock(1) - implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = 10, + clock = clock) val appId = "app1" val attemptId = Some("_01") - operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0) + operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0) assertNotFound(appId, None) } @@ -271,50 +223,29 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Incomplete apps refreshed") { val operations = new StubCacheOperations() val clock = new ManualClock(50) - val window = 500 - implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock) + implicit val cache = new ApplicationCache(operations, 5, clock) val metrics = cache.metrics // add the incomplete app // add the entry val started = clock.getTimeMillis() val appId = "app1" val attemptId = Some("001") - operations.putAppUI(appId, attemptId, false, started, 0, started) - val firstEntry = cache.lookupCacheEntry(appId, attemptId) - assert(started === firstEntry.probeTime, s"timestamp in $firstEntry") - assert(!firstEntry.completed, s"entry is complete: $firstEntry") - assertMetric("lookupCount", metrics.lookupCount, 1) + val initialUI = operations.putAndAttach(appId, attemptId, false, started, 0) + val firstUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assertMetric("lookupCount", metrics.lookupCount, 1) assert(0 === operations.updateProbeCount, "expected no update probe on that first get") - val checkTime = window * 2 - clock.setTime(checkTime) - val entry3 = cache.lookupCacheEntry(appId, attemptId) - assert(firstEntry !== entry3, s"updated entry test from $cache") + // Invalidate the first entry to trigger a re-load. + initialUI.invalidate() + + // Update the UI in the stub so that a new one is provided to the cache. + operations.putAppUI(appId, attemptId, true, started, started + 10) + + val updatedUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assert(firstUI !== updatedUI, s"expected updated UI") assertMetric("lookupCount", metrics.lookupCount, 2) - assertMetric("updateProbeCount", metrics.updateProbeCount, 1) - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0) - assert(1 === operations.updateProbeCount, s"refresh count in $cache") - assert(0 === operations.detachCount, s"detach count") - assert(entry3.probeTime === checkTime) - - val updateTime = window * 3 - // update the cached value - val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime) - val endTime = window * 10 - clock.setTime(endTime) - logDebug(s"Before operation = $cache") - val entry5 = cache.lookupCacheEntry(appId, attemptId) - assertMetric("lookupCount", metrics.lookupCount, 3) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) - // the update was triggered - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1) - assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache") - - // at which point, the refreshes stop - clock.setTime(window * 20) - assertCacheEntryEquals(appId, attemptId, entry5) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) + assert(1 === operations.detachCount, s"detach count") } /** @@ -337,27 +268,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } } - /** - * Look up the cache entry and assert that it matches in the expected value. - * This assertion works if the two CacheEntries are different -it looks at the fields. - * UI are compared on object equality; the timestamp and completed flags directly. - * @param appId application ID - * @param attemptId attempt ID - * @param expected expected value - * @param cache app cache - */ - def assertCacheEntryEquals( - appId: String, - attemptId: Option[String], - expected: CacheEntry) - (implicit cache: ApplicationCache): Unit = { - val actual = cache.lookupCacheEntry(appId, attemptId) - val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache" - assert(expected.ui === actual.ui, errorText + " SparkUI reference") - assert(expected.completed === actual.completed, errorText + " -completed flag") - assert(expected.probeTime === actual.probeTime, errorText + " -timestamp") - } - /** * Assert that a key wasn't found in cache or loaded. * @@ -370,14 +280,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar appId: String, attemptId: Option[String]) (implicit cache: ApplicationCache): Unit = { - val ex = intercept[UncheckedExecutionException] { + val ex = intercept[NoSuchElementException] { cache.get(appId, attemptId) } - var cause = ex.getCause - assert(cause !== null) - if (!cause.isInstanceOf[NoSuchElementException]) { - throw cause - } } test("Large Scale Application Eviction") { @@ -385,12 +290,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val clock = new ManualClock(0) val size = 5 // only two entries are retained, so we expect evictions to occur on lookups - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = size, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = size, + clock = clock) val attempt1 = Some("01") - val ids = new ListBuffer[String]() + val ids = new mutable.ListBuffer[String]() // build a list of applications val count = 100 for (i <- 1 to count ) { @@ -398,7 +303,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar ids += appId clock.advance(10) val t = clock.getTimeMillis() - operations.putAppUI(appId, attempt1, true, t, t, t) + operations.putAppUI(appId, attempt1, true, t, t) } // now go through them in sequence reading them, expect evictions ids.foreach { id => @@ -413,20 +318,19 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Attempts are Evicted") { val operations = new StubCacheOperations() - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = 4) + implicit val cache = new ApplicationCache(operations, 4, new ManualClock()) val metrics = cache.metrics val appId = "app1" val attempt1 = Some("01") val attempt2 = Some("02") val attempt3 = Some("03") - operations.putAppUI(appId, attempt1, true, 100, 110, 110) - operations.putAppUI(appId, attempt2, true, 200, 210, 210) - operations.putAppUI(appId, attempt3, true, 300, 310, 310) + operations.putAppUI(appId, attempt1, true, 100, 110) + operations.putAppUI(appId, attempt2, true, 200, 210) + operations.putAppUI(appId, attempt3, true, 300, 310) val attempt4 = Some("04") - operations.putAppUI(appId, attempt4, true, 400, 410, 410) + operations.putAppUI(appId, attempt4, true, 400, 410) val attempt5 = Some("05") - operations.putAppUI(appId, attempt5, true, 500, 510, 510) + operations.putAppUI(appId, attempt5, true, 500, 510) def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = { assertMetric("loadCount", metrics.loadCount, expectedLoad) @@ -457,20 +361,14 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } - test("Instantiate Filter") { - // this is a regression test on the filter being constructable - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val instance = clazz.newInstance() - instance shouldBe a [Filter] - } - test("redirect includes query params") { - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter] - filter.appId = "local-123" + val operations = new StubCacheOperations() + val ui = operations.putAndAttach("foo", None, true, 0, 10) val cache = mock[ApplicationCache] - when(cache.checkForUpdates(any(), any())).thenReturn(true) - ApplicationCacheCheckFilterRelay.setApplicationCache(cache) + when(cache.operations).thenReturn(operations) + val filter = new ApplicationCacheCheckFilter(new CacheKey("foo", None), ui, cache) + ui.invalidate() + val request = mock[HttpServletRequest] when(request.getMethod()).thenReturn("GET") when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 03bd3eaf579f3..86c8cdf43258c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.security.GroupMappingServiceProvider +import org.apache.spark.status.AppStatusStore import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { @@ -612,7 +613,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Manually overwrite the version in the listing db; this should cause the new provider to // discard all data because the versions don't match. val meta = new FsHistoryProviderMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1, - conf.get(LOCAL_STORE_DIR).get) + AppStatusStore.CURRENT_VERSION, conf.get(LOCAL_STORE_DIR).get) oldProvider.listing.setMetadata(meta) oldProvider.stop() @@ -620,6 +621,43 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0) } + test("invalidate cached UI") { + val provider = new FsHistoryProvider(createTestConf()) + val appId = "new1" + + // Write an incomplete app log. + val appLog = newLogFile(appId, None, inProgress = true) + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None) + ) + provider.checkForLogs() + + // Load the app UI. + val oldUI = provider.getAppUI(appId, None) + assert(oldUI.isDefined) + intercept[NoSuchElementException] { + oldUI.get.ui.store.job(0) + } + + // Add more info to the app log, and trigger the provider to update things. + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) + provider.checkForLogs() + + // Manually detach the old UI; ApplicationCache would do this automatically in a real SHS + // when the app's UI was requested. + provider.onUIDetached(appId, None, oldUI.get.ui) + + // Load the UI again and make sure we can get the new info added to the logs. + val freshUI = provider.getAppUI(appId, None) + assert(freshUI.isDefined) + assert(freshUI != oldUI) + freshUI.get.ui.store.job(0) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index c11543a4b3ba2..010a8dd004d4f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -72,6 +72,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private var port: Int = -1 def init(extraConf: (String, String)*): Unit = { + Utils.deleteRecursively(storeDir) + assert(storeDir.mkdir()) val conf = new SparkConf() .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") @@ -292,21 +294,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val uiRoot = "/testwebproxybase" System.setProperty("spark.ui.proxyBase", uiRoot) - server.stop() - - val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir) - .set("spark.history.fs.update.interval", "0") - .set("spark.testing", "true") - .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) - - provider = new FsHistoryProvider(conf) - provider.checkForLogs() - val securityManager = HistoryServer.createSecurityManager(conf) - - server = new HistoryServer(conf, provider, securityManager, 18080) - server.initialize() - server.bind() + stop() + init() val port = server.boundPort @@ -375,8 +364,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { - server.stop() - implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -386,6 +373,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // a new conf is used with the background thread set and running at its fastest // allowed refresh rate (1Hz) + stop() val myConf = new SparkConf() .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) .set("spark.eventLog.dir", logDir.getAbsolutePath) @@ -418,7 +406,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - server = new HistoryServer(myConf, provider, securityManager, 18080) + server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() val port = server.boundPort @@ -464,7 +452,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers rootAppPage should not be empty def getAppUI: SparkUI = { - provider.getAppUI(appId, None).get.ui + server.withSparkUI(appId, None) { ui => ui } } // selenium isn't that useful on failures...add our own reporting @@ -519,7 +507,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers getNumJobs("") should be (1) getNumJobs("/jobs") should be (1) getNumJobsRestful() should be (1) - assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics") + assert(metrics.lookupCount.getCount > 0, s"lookup count too low in $metrics") // dump state before the next bit of test, which is where update // checking really gets stressed diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 6f7a0c14dd684..7ac1ce19f8ddf 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.status import java.io.File -import java.util.{Date, Properties} +import java.lang.{Integer => JInteger, Long => JLong} +import java.util.{Arrays, Date, Properties} import scala.collection.JavaConverters._ import scala.reflect.{classTag, ClassTag} @@ -36,6 +37,10 @@ import org.apache.spark.util.kvstore._ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { + import config._ + + private val conf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + private var time: Long = _ private var testDir: File = _ private var store: KVStore = _ @@ -52,7 +57,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("scheduler events") { - val listener = new AppStatusListener(store) + val listener = new AppStatusListener(store, conf, true) // Start the application. time += 1 @@ -174,6 +179,14 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { s1Tasks.foreach { task => check[TaskDataWrapper](task.taskId) { wrapper => assert(wrapper.info.taskId === task.taskId) + assert(wrapper.stageId === stages.head.stageId) + assert(wrapper.stageAttemptId === stages.head.attemptId) + assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId))) + + val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger, + -1L: JLong) + assert(Arrays.equals(wrapper.runtime, runtime)) + assert(wrapper.info.index === task.index) assert(wrapper.info.attempt === task.attemptNumber) assert(wrapper.info.launchTime === new Date(task.launchTime)) @@ -510,7 +523,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("storage events") { - val listener = new AppStatusListener(store) + val listener = new AppStatusListener(store, conf, true) val maxMemory = 42L // Register a couple of block managers. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 45b8870f3b62f..99cac34c85ebc 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,6 +38,8 @@ object MimaExcludes { lazy val v23excludes = v22excludes ++ Seq( // SPARK-18085: Better History Server scalability for many / large applications ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), + // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),