From 8c346a148d7be78b0f53aadb9c8ca78098b0ea6c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 18 Apr 2017 13:38:10 -0700 Subject: [PATCH 1/5] [SPARK-20653][core] Add cleaning of old elements from the status store. This change restores the functionality that keeps a limited number of different types (jobs, stages, etc) depending on configuration, to avoid the store growing indefinitely over time. The feature is implemented by creating a new type (ElementTrackingStore) that wraps a KVStore and allows triggers to be set up for when elements of a certain type meet a certain threshold. Triggers don't need to necessarily only delete elements, but the current API is set up in a way that makes that use case easier. The new store also has a trigger for the "close" call, which makes it easier for listeners to register code for cleaning things up and flushing partial state to the store. The old configurations for cleaning up the stored elements from the core and SQL UIs are now active again, and the old unit tests are re-enabled. --- .../deploy/history/FsHistoryProvider.scala | 8 +- .../spark/internal/config/package.scala | 5 - .../spark/status/AppStatusListener.scala | 180 ++++++++++++++++-- .../apache/spark/status/AppStatusPlugin.scala | 2 +- .../apache/spark/status/AppStatusStore.scala | 6 +- .../spark/status/ElementTrackingStore.scala | 168 ++++++++++++++++ .../org/apache/spark/status/KVUtils.scala | 14 ++ .../org/apache/spark/status/LiveEntity.scala | 9 +- .../status/api/v1/OneStageResource.scala | 10 +- .../org/apache/spark/status/config.scala | 20 ++ .../org/apache/spark/status/storeTypes.scala | 16 ++ .../scala/org/apache/spark/ui/SparkUI.scala | 2 - .../apache/spark/ui/jobs/AllJobsPage.scala | 8 +- .../apache/spark/ui/jobs/AllStagesPage.scala | 8 +- .../spark/ui/jobs/JobProgressListener.scala | 6 +- .../deploy/history/HistoryServerSuite.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 142 +++++++++++--- .../status/ElementTrackingStoreSuite.scala | 91 +++++++++ .../org/apache/spark/ui/UISeleniumSuite.scala | 8 +- .../spark/sql/internal/StaticSQLConf.scala | 7 + .../execution/ui/SQLAppStatusListener.scala | 33 +++- .../sql/execution/ui/SQLAppStatusStore.scala | 7 +- .../sql/execution/ui/SQLListenerSuite.scala | 29 +-- 23 files changed, 702 insertions(+), 79 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala create mode 100644 core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 25f82b55f2003..d7875426b43c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 +import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -315,12 +316,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) (new InMemoryStore(), true) } + val trackingStore = new ElementTrackingStore(kvstore, conf) val listener = if (needReplay) { - val _listener = new AppStatusListener(kvstore, conf, false, + val _listener = new AppStatusListener(trackingStore, conf, false, lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) replayBus.addListener(_listener) AppStatusPlugin.loadPlugins().foreach { plugin => - plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false) + plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false) } Some(_listener) } else { @@ -342,7 +344,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - listener.foreach(_.flush()) + trackingStore.close(false) } catch { case e: Exception => try { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 57e2da8353d6d..6ca7b70341c60 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -217,11 +217,6 @@ package object config { .stringConf .createOptional - // To limit memory usage, we only track information for a fixed number of tasks - private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks") - .intConf - .createWithDefault(100000) - // To limit how many applications are shown in the History Server summary ui private[spark] val HISTORY_UI_MAX_APPS = ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f2d8e0a5480ba..a5386c2f8b5a8 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.status import java.util.Date +import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark._ @@ -29,7 +30,6 @@ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ -import org.apache.spark.util.kvstore.KVStore /** * A Spark listener that writes application information to a data store. The types written to the @@ -39,7 +39,7 @@ import org.apache.spark.util.kvstore.KVStore * unfinished tasks can be more accurately calculated (see SPARK-21922). */ private[spark] class AppStatusListener( - kvstore: KVStore, + kvstore: ElementTrackingStore, conf: SparkConf, live: Boolean, lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { @@ -48,6 +48,7 @@ private[spark] class AppStatusListener( private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null + private var appSummary = new AppSummary(0, 0) private var coresPerTask: Int = 1 // How often to update live entities. -1 means "never update" when replaying applications, @@ -55,6 +56,7 @@ private[spark] class AppStatusListener( // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) // Keep track of live entities, so that task metrics can be efficiently updated (without @@ -65,10 +67,25 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + // Keep the active executor count as a separate variable to avoid having to do synchronization + // around liveExecutors. + @volatile private var activeExecutorCount = 0 - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerLogStart(version) => sparkVersion = version - case _ => + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) + { count => cleanupExecutors(count) } + + kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count => + cleanupJobs(count) + } + + kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count => + cleanupStages(count) + } + + kvstore.onFlush { + if (!live) { + flush() + } } override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { @@ -94,6 +111,7 @@ private[spark] class AppStatusListener( Seq(attempt)) kvstore.write(new ApplicationInfoWrapper(appInfo)) + kvstore.write(appSummary) } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { @@ -155,10 +173,11 @@ private[spark] class AppStatusListener( override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { liveExecutors.remove(event.executorId).foreach { exec => val now = System.nanoTime() + activeExecutorCount = math.max(0, activeExecutorCount - 1) exec.isActive = false exec.removeTime = new Date(event.time) exec.removeReason = event.reason - update(exec, now) + update(exec, now, last = true) // Remove all RDD distributions that reference the removed executor, in case there wasn't // a corresponding event. @@ -285,8 +304,11 @@ private[spark] class AppStatusListener( } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None - update(job, now) + update(job, now, last = true) } + + appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) + kvstore.write(appSummary) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -345,6 +367,13 @@ private[spark] class AppStatusListener( job.activeTasks += 1 maybeUpdate(job, now) } + + if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) { + stage.cleaning = true + kvstore.doAsync { + cleanupTasks(stage) + } + } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -444,6 +473,13 @@ private[spark] class AppStatusListener( esummary.metrics.update(metricsDelta) } maybeUpdate(esummary, now) + + if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { + stage.cleaning = true + kvstore.doAsync { + cleanupTasks(stage) + } + } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -504,8 +540,11 @@ private[spark] class AppStatusListener( } stage.executorSummaries.values.foreach(update(_, now)) - update(stage, now) + update(stage, now, last = true) } + + appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) + kvstore.write(appSummary) } override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { @@ -561,7 +600,7 @@ private[spark] class AppStatusListener( } /** Flush all live entities' data to the underlying store. */ - def flush(): Unit = { + private def flush(): Unit = { val now = System.nanoTime() liveStages.values.foreach { stage => update(stage, now) @@ -684,7 +723,10 @@ private[spark] class AppStatusListener( } private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = { - liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime)) + liveExecutors.getOrElseUpdate(executorId, { + activeExecutorCount += 1 + new LiveExecutor(executorId, addTime) + }) } private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = { @@ -727,8 +769,8 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long): Unit = { - entity.write(kvstore, now) + private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { + entity.write(kvstore, now, checkTriggers = last) } /** Update a live entity only if it hasn't been updated in the last configured period. */ @@ -745,4 +787,118 @@ private[spark] class AppStatusListener( } } + private def cleanupExecutors(count: Long): Unit = { + // Because the limit is on the number of *dead* executors, we need to calculate whether + // there are actually enough dead executors to be deleted. + val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) + val dead = count - activeExecutorCount + + if (dead > threshold) { + val countToDelete = calculateNumberToRemove(dead, threshold) + val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") + .max(countToDelete).first(false).last(false).asScala.toSeq + toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } + } + } + + private def cleanupJobs(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS)) + if (countToDelete <= 0L) { + return + } + + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), + countToDelete.toInt) { j => + j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN + } + toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } + } + + private def cleanupStages(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) + if (countToDelete <= 0L) { + return + } + + val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), + countToDelete.toInt) { s => + s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING + } + + stages.foreach { s => + val key = s.id + kvstore.delete(s.getClass(), key) + + val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage") + .first(key) + .last(key) + .asScala + .toSeq + execSummaries.foreach { e => + kvstore.delete(e.getClass(), e.id) + } + + val tasks = kvstore.view(classOf[TaskDataWrapper]) + .index("stage") + .first(key) + .last(key) + .asScala + + tasks.foreach { t => + kvstore.delete(t.getClass(), t.info.taskId) + } + + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) + .index("stageId") + .first(s.stageId) + .last(s.stageId) + .closeableIterator() + + val hasMoreAttempts = try { + remainingAttempts.asScala.exists { other => + other.info.attemptId != s.info.attemptId + } + } finally { + remainingAttempts.close() + } + + if (!hasMoreAttempts) { + kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } + } + } + + private def cleanupTasks(stage: LiveStage): Unit = { + val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage) + if (countToDelete > 0L) { + val stageKey = Array(stage.info.stageId, stage.info.attemptId) + val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) + .last(stageKey) + + // On live applications, try to delete finished tasks only; when in the SHS, treat all + // tasks as the same. + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t => + !live || t.info.status != TaskState.RUNNING.toString() + } + toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) } + stage.savedTasks.addAndGet(-toDelete.size) + } + stage.cleaning = false + } + + /** + * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done + * asynchronously, this method may return 0 in case enough items have been deleted already. + */ + private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = { + if (dataSize > retainedSize) { + math.max(retainedSize / 10L, dataSize - retainedSize) + } else { + 0L + } + } + } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala index 69ca02ec76293..4cada5c7b0de4 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala @@ -48,7 +48,7 @@ private[spark] trait AppStatusPlugin { */ def setupListeners( conf: SparkConf, - store: KVStore, + store: ElementTrackingStore, addListenerFn: SparkListener => Unit, live: Boolean): Unit diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index d0615e5dd0223..b55516d79b635 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -320,6 +320,10 @@ private[spark] class AppStatusStore(val store: KVStore) { store.read(classOf[PoolData], name) } + def appSummary(): AppSummary = { + store.read(classOf[AppSummary], classOf[AppSummary].getName()) + } + def close(): Unit = { store.close() } @@ -337,7 +341,7 @@ private[spark] object AppStatusStore { * @param addListenerFn Function to register a listener with a bus. */ def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = { - val store = new InMemoryStore() + val store = new ElementTrackingStore(new InMemoryStore(), conf) addListenerFn(new AppStatusListener(store, conf, true)) AppStatusPlugin.loadPlugins().foreach { p => p.setupListeners(conf, store, addListenerFn, true) diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala new file mode 100644 index 0000000000000..282ed35d9b528 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { + None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { + val existing = triggers.getOrElse(klass, Seq()) + triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { + flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { + executor match { + case Some(exec) => + exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } + }) + + case _ => + fn + } + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally checking for whether to fire triggers. */ + def write(value: Any, checkTriggers: Boolean): Unit = { + write(value) + + if (checkTriggers && !stopped) { + triggers.get(value.getClass()).foreach { list => + doAsync { + val count = store.count(value.getClass()) + list.foreach { t => + if (count > t.threshold) { + t.action(count) + } + } + } + } + } + } + + override def delete(klass: Class[_], naturalKey: Any): Unit = store.delete(klass, naturalKey) + + override def getMetadata[T](klass: Class[T]): T = store.getMetadata(klass) + + override def setMetadata(value: Any): Unit = store.setMetadata(value) + + override def view[T](klass: Class[T]): KVStoreView[T] = store.view(klass) + + override def count(klass: Class[_]): Long = store.count(klass) + + override def count(klass: Class[_], index: String, indexedValue: Any): Long = { + store.count(klass, index, indexedValue) + } + + override def close(): Unit = { + close(true) + } + + /** A close() method that optionally leaves the parent store open. */ + def close(closeParent: Boolean): Unit = synchronized { + if (stopped) { + return + } + + stopped = true + executor.foreach { exec => + exec.shutdown() + if (!exec.awaitTermination(5, TimeUnit.SECONDS)) { + exec.shutdownNow() + } + } + + flushTriggers.foreach { trigger => + Utils.tryLog(trigger()) + } + + if (closeParent) { + store.close() + } + } + + private case class Trigger[T]( + threshold: Long, + action: Long => Unit) + +} diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 4638511944c61..99b1843d8e1c0 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.status import java.io.File import scala.annotation.meta.getter +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} @@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging { db } + /** Turns a KVStoreView into a Scala sequence, applying a filter. */ + def viewToSeq[T]( + view: KVStoreView[T], + max: Int) + (filter: T => Boolean): Seq[T] = { + val iter = view.closeableIterator() + try { + iter.asScala.filter(filter).take(max).toList + } finally { + iter.close() + } + } + private[spark] class MetadataMismatchException extends Exception } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index ef2936c9b69a4..72ada91eaf825 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -18,6 +18,7 @@ package org.apache.spark.status import java.util.Date +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.HashMap @@ -40,8 +41,8 @@ private[spark] abstract class LiveEntity { var lastWriteTime = 0L - def write(store: KVStore, now: Long): Unit = { - store.write(doUpdate()) + def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { + store.write(doUpdate(), checkTriggers || lastWriteTime == 0L) lastWriteTime = now } @@ -403,6 +404,10 @@ private class LiveStage extends LiveEntity { val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() + // Used for cleanup of tasks after they reach the configured limit. Not written to the store. + @volatile var cleaning = false + var savedTasks = new AtomicInteger(0) + def executorSummary(executorId: String): LiveExecutorStageSummary = { executorSummaries.getOrElseUpdate(executorId, new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId)) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index 20dd73e916613..735010848e880 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -52,7 +52,15 @@ private[v1] class OneStageResource(ui: SparkUI) { ui.store.stageAttempt(stageId, stageAttemptId, details = details) } catch { case _: NoSuchElementException => - throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.") + // Change the message depending on whether there are any attempts for the requested stage. + val all = ui.store.stageData(stageId) + val msg = if (all.nonEmpty) { + val ids = all.map(_.attemptId) + s"unknown attempt for stage $stageId. Found attempts: [${ids.mkString(",")}]" + } else { + s"unknown stage: $stageId" + } + throw new NotFoundException(msg) } } diff --git a/core/src/main/scala/org/apache/spark/status/config.scala b/core/src/main/scala/org/apache/spark/status/config.scala index 7af9dff977a86..67801b8f046f4 100644 --- a/core/src/main/scala/org/apache/spark/status/config.scala +++ b/core/src/main/scala/org/apache/spark/status/config.scala @@ -23,10 +23,30 @@ import org.apache.spark.internal.config._ private[spark] object config { + val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable") + .booleanConf + .createWithDefault(true) + val LIVE_ENTITY_UPDATE_PERIOD = ConfigBuilder("spark.ui.liveUpdate.period") .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("100ms") + val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs") + .intConf + .createWithDefault(1000) + + val MAX_RETAINED_STAGES = ConfigBuilder("spark.ui.retainedStages") + .intConf + .createWithDefault(1000) + + val MAX_RETAINED_TASKS_PER_STAGE = ConfigBuilder("spark.ui.retainedTasks") + .intConf + .createWithDefault(100000) + + val MAX_RETAINED_DEAD_EXECUTORS = ConfigBuilder("spark.ui.retainedDeadExecutors") + .intConf + .createWithDefault(100) + val MAX_RETAINED_ROOT_NODES = ConfigBuilder("spark.ui.dagGraph.retainedRootRDDs") .intConf .createWithDefault(Int.MaxValue) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index c1ea87542d6cc..d9ead0071d3bf 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -112,6 +112,9 @@ private[spark] class TaskDataWrapper( Array(stageId: JInteger, stageAttemptId: JInteger, info.launchTime.getTime(): JLong) } + @JsonIgnore @KVIndex("active") + def active: Boolean = info.duration.isEmpty + } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { @@ -187,3 +190,16 @@ private[spark] class RDDOperationGraphWrapper( private[spark] class PoolData( @KVIndexParam val name: String, val stageIds: Set[Int]) + +/** + * A class with information about an app, to be used by the UI. There's only one instance of + * this summary per application, so its ID in the store is the class name. + */ +private[spark] class AppSummary( + val numCompletedJobs: Int, + val numCompletedStages: Int) { + + @KVIndex + def id: String = classOf[AppSummary].getName() + +} diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 35da3c3bfd1a2..b44ac0ea1febc 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -154,8 +154,6 @@ private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val DEFAULT_POOL_NAME = "default" - val DEFAULT_RETAINED_STAGES = 1000 - val DEFAULT_RETAINED_JOBS = 1000 def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index b60d39b21b4bf..37e3b3b304a63 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -300,7 +300,13 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We val shouldShowCompletedJobs = completedJobs.nonEmpty val shouldShowFailedJobs = failedJobs.nonEmpty - val completedJobNumStr = s"${completedJobs.size}" + val appSummary = store.appSummary() + val completedJobNumStr = if (completedJobs.size == appSummary.numCompletedJobs) { + s"${completedJobs.size}" + } else { + s"${appSummary.numCompletedJobs}, only showing ${completedJobs.size}" + } + val schedulingMode = store.environmentInfo().sparkProperties.toMap .get("spark.scheduler.mode") .map { mode => SchedulingMode.withName(mode).toString } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index e4cf99e7b9e04..b1e343451e28e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -39,7 +39,6 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val completedStages = allStages.filter(_.status == StageStatus.COMPLETE) val failedStages = allStages.filter(_.status == StageStatus.FAILED).reverse - val numCompletedStages = completedStages.size val numFailedStages = failedStages.size val subPath = "stages" @@ -69,10 +68,11 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val shouldShowCompletedStages = completedStages.nonEmpty val shouldShowFailedStages = failedStages.nonEmpty - val completedStageNumStr = if (numCompletedStages == completedStages.size) { - s"$numCompletedStages" + val appSummary = parent.store.appSummary() + val completedStageNumStr = if (appSummary.numCompletedStages == completedStages.size) { + s"${appSummary.numCompletedStages}" } else { - s"$numCompletedStages, only showing ${completedStages.size}" + s"${appSummary.numCompletedStages}, only showing ${completedStages.size}" } val summary: NodeSeq = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index a18e86ec0a73b..0210146fe4427 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -93,9 +93,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // To limit the total memory usage of JobProgressListener, we only track information for a fixed // number of non-active jobs and stages (there is no limit for active jobs and stages): - val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) - val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) - val retainedTasks = conf.get(UI_RETAINED_TASKS) + val retainedStages = conf.getInt("spark.ui.retainedStages", 1000) + val retainedJobs = conf.getInt("spark.ui.retainedJobs", 1000) + val retainedTasks = conf.getInt("spark.ui.retainedTasks", 100000) // We can test for memory leaks by ensuring that collections that track non-active jobs and // stages do not grow without bound and that collections for active jobs/stages eventually become diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index d22a19e8af74a..6586a7c37f8fd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -263,7 +263,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val badStageAttemptId = getContentAndCode("applications/local-1422981780767/stages/1/1") badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND) - badStageAttemptId._3 should be (Some("unknown attempt 1 for stage 1.")) + badStageAttemptId._3 should be (Some("unknown attempt for stage 1. Found attempts: [0]")) val badStageId2 = getContentAndCode("applications/local-1422981780767/stages/flimflam") badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 88fe6bd70a14e..9c08a0ae1d951 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -39,16 +39,20 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { import config._ - private val conf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + private val conf = new SparkConf() + .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + .set(ASYNC_TRACKING_ENABLED, false) private var time: Long = _ private var testDir: File = _ - private var store: KVStore = _ + private var store: ElementTrackingStore = _ + private var taskIdTracker = -1L before { time = 0L testDir = Utils.createTempDir() - store = KVUtils.open(testDir, getClass().getName()) + store = new ElementTrackingStore(KVUtils.open(testDir, getClass().getName()), conf) + taskIdTracker = -1L } after { @@ -185,22 +189,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Start tasks from stage 1 time += 1 - var _taskIdTracker = -1L - def nextTaskId(): Long = { - _taskIdTracker += 1 - _taskIdTracker - } - - def createTasks(count: Int, time: Long): Seq[TaskInfo] = { - (1 to count).map { id => - val exec = execIds(id.toInt % execIds.length) - val taskId = nextTaskId() - new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", - TaskLocality.PROCESS_LOCAL, id % 2 == 0) - } - } - val s1Tasks = createTasks(4, time) + val s1Tasks = createTasks(4, execIds) s1Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task)) } @@ -419,7 +409,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Start and fail all tasks of stage 2. time += 1 - val s2Tasks = createTasks(4, time) + val s2Tasks = createTasks(4, execIds) s2Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task)) } @@ -470,7 +460,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) assert(store.count(classOf[StageDataWrapper]) === 3) - val newS2Tasks = createTasks(4, time) + val newS2Tasks = createTasks(4, execIds) newS2Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task)) @@ -526,7 +516,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(store.count(classOf[StageDataWrapper]) === 5) time += 1 - val j2s2Tasks = createTasks(4, time) + val j2s2Tasks = createTasks(4, execIds) j2s2Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId, @@ -587,8 +577,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } // Stop executors. - listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test")) - listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test")) + time += 1 + listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test")) Seq("1", "2").foreach { id => check[ExecutorSummaryWrapper](id) { exec => @@ -851,6 +842,97 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("eviction of old data") { + val testConf = conf.clone() + .set(MAX_RETAINED_JOBS, 2) + .set(MAX_RETAINED_STAGES, 2) + .set(MAX_RETAINED_TASKS_PER_STAGE, 2) + .set(MAX_RETAINED_DEAD_EXECUTORS, 1) + val listener = new AppStatusListener(store, testConf, true) + + // Start 3 jobs, all should be kept. Stop one, it should be evicted. + time += 1 + listener.onJobStart(SparkListenerJobStart(1, time, Nil, null)) + listener.onJobStart(SparkListenerJobStart(2, time, Nil, null)) + listener.onJobStart(SparkListenerJobStart(3, time, Nil, null)) + assert(store.count(classOf[JobDataWrapper]) === 3) + + time += 1 + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + assert(store.count(classOf[JobDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[JobDataWrapper], 2) + } + + // Start 3 stages, all should be kept. Stop 2 of them, the oldest stopped one should be + // deleted. Start a new attempt of the second stopped one, and verify that the stage graph + // data is not deleted. + time += 1 + val stages = Seq( + new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), + new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2"), + new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")) + + // Graph data is generated by the job start event, so fire it. + listener.onJobStart(SparkListenerJobStart(4, time, stages, null)) + + stages.foreach { s => + time += 1 + s.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(s, new Properties())) + } + + assert(store.count(classOf[StageDataWrapper]) === 3) + assert(store.count(classOf[RDDOperationGraphWrapper]) === 3) + + stages.drop(1).foreach { s => + time += 1 + s.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(s)) + } + + assert(store.count(classOf[StageDataWrapper]) === 2) + assert(store.count(classOf[RDDOperationGraphWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(2, 0)) + } + + val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3") + time += 1 + attempt2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(attempt2, new Properties())) + + assert(store.count(classOf[StageDataWrapper]) === 2) + assert(store.count(classOf[RDDOperationGraphWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(2, 0)) + } + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(3, 0)) + } + store.read(classOf[StageDataWrapper], Array(3, 1)) + + // Start 3 tasks and stop two of them. The oldest should be deleted. + time += 1 + val tasks = createTasks(3, Array("1")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) + } + assert(store.count(classOf[TaskDataWrapper]) === 3) + + tasks.drop(1).foreach { task => + time += 1 + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(attempt2.stageId, attempt2.attemptId, + "taskType", TaskResultLost, task, null)) + } + assert(store.count(classOf[TaskDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[TaskDataWrapper], tasks.drop(1).head.id) + } + + } + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { @@ -864,6 +946,20 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) } + private def createTasks(count: Int, execs: Array[String]): Seq[TaskInfo] = { + (1 to count).map { id => + val exec = execs(id.toInt % execs.length) + val taskId = nextTaskId() + new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, id % 2 == 0) + } + } + + private def nextTaskId(): Long = { + taskIdTracker += 1 + taskIdTracker + } + private case class RddBlock( rddId: Int, partId: Int, diff --git a/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala new file mode 100644 index 0000000000000..07a7b58404c29 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.util.kvstore._ + +class ElementTrackingStoreSuite extends SparkFunSuite { + + import config._ + + test("tracking for multiple types") { + val store = mock(classOf[KVStore]) + val tracking = new ElementTrackingStore(store, new SparkConf() + .set(ASYNC_TRACKING_ENABLED, false)) + + var type1 = 0L + var type2 = 0L + var flushed = false + + tracking.addTrigger(classOf[Type1], 100) { count => + type1 = count + } + tracking.addTrigger(classOf[Type2], 1000) { count => + type2 = count + } + tracking.onFlush { + flushed = true + } + + when(store.count(classOf[Type1])).thenReturn(1L) + tracking.write(new Type1, true) + assert(type1 === 0L) + assert(type2 === 0L) + + when(store.count(classOf[Type1])).thenReturn(100L) + tracking.write(new Type1, true) + assert(type1 === 0L) + assert(type2 === 0L) + + when(store.count(classOf[Type1])).thenReturn(101L) + tracking.write(new Type1, true) + assert(type1 === 101L) + assert(type2 === 0L) + + when(store.count(classOf[Type1])).thenReturn(200L) + tracking.write(new Type1, true) + assert(type1 === 200L) + assert(type2 === 0L) + + when(store.count(classOf[Type2])).thenReturn(500L) + tracking.write(new Type2, true) + assert(type1 === 200L) + assert(type2 === 0L) + + when(store.count(classOf[Type2])).thenReturn(1000L) + tracking.write(new Type2, true) + assert(type1 === 200L) + assert(type2 === 0L) + + when(store.count(classOf[Type2])).thenReturn(2000L) + tracking.write(new Type2, true) + assert(type1 === 200L) + assert(type2 === 2000L) + + tracking.close(false) + assert(flushed) + verify(store, never()).close() + } + + private class Type1 + private class Type2 + +} diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 6a6c37873e1c2..fb21596cdffbf 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.api.java.StorageLevels import org.apache.spark.deploy.history.HistoryServerSuite import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} +import org.apache.spark.status.config._ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { @@ -524,14 +525,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } - ignore("stage & job retention") { + test("stage & job retention") { val conf = new SparkConf() .setMaster("local") .setAppName("test") .set("spark.ui.enabled", "true") .set("spark.ui.port", "0") - .set("spark.ui.retainedStages", "3") - .set("spark.ui.retainedJobs", "2") + .set(MAX_RETAINED_STAGES, 3) + .set(MAX_RETAINED_JOBS, 2) + .set(ASYNC_TRACKING_ENABLED, false) val sc = new SparkContext(conf) assert(sc.ui.isDefined) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index c018fc8a332fa..fe0ad39c29025 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -95,4 +95,11 @@ object StaticSQLConf { .stringConf .toSequence .createOptional + + val UI_RETAINED_EXECUTIONS = + buildStaticConf("spark.sql.ui.retainedExecutions") + .doc("Number of executions to retain in the Spark UI.") + .intConf + .createWithDefault(1000) + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 43cec4807ae4d..cf0000c6393a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -27,14 +27,15 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric._ -import org.apache.spark.status.LiveEntity +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.kvstore.KVStore private[sql] class SQLAppStatusListener( conf: SparkConf, - kvstore: KVStore, + kvstore: ElementTrackingStore, live: Boolean, ui: Option[SparkUI] = None) extends SparkListener with Logging { @@ -51,6 +52,23 @@ private[sql] class SQLAppStatusListener( private var uiInitialized = false + kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count => + cleanupExecutions(count) + } + + kvstore.onFlush { + if (!live) { + val now = System.nanoTime() + liveExecutions.values.asScala.foreach { exec => + // This saves the partial aggregated metrics to the store; this works currently because + // when the SHS sees an updated event log, all old data for the application is thrown + // away. + exec.metricsValues = aggregateMetrics(exec) + exec.write(kvstore, now) + } + } + } + override def onJobStart(event: SparkListenerJobStart): Unit = { val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) if (executionIdString == null) { @@ -317,6 +335,17 @@ private[sql] class SQLAppStatusListener( } } + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) + if (countToDelete <= 0) { + return + } + + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]), + countToDelete.toInt) { e => e.completionTime.isDefined } + toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } + } + } private class LiveExecutionData(val executionId: Long) extends LiveEntity { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 586d3ae411c74..7fd5f7395cdf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener -import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.{AppStatusPlugin, ElementTrackingStore} import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils @@ -84,7 +84,7 @@ private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { override def setupListeners( conf: SparkConf, - store: KVStore, + store: ElementTrackingStore, addListenerFn: SparkListener => Unit, live: Boolean): Unit = { // For live applications, the listener is installed in [[setupUI]]. This also avoids adding @@ -100,7 +100,8 @@ private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { case Some(sc) => // If this is a live application, then install a listener that will enable the SQL // tab as soon as there's a SQL event posted to the bus. - val listener = new SQLAppStatusListener(sc.conf, ui.store.store, true, Some(ui)) + val listener = new SQLAppStatusListener(sc.conf, + ui.store.store.asInstanceOf[ElementTrackingStore], true, Some(ui)) sc.listenerBus.addToStatusQueue(listener) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index eba8d55daad58..932950687942c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.status.ElementTrackingStore import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} import org.apache.spark.util.kvstore.InMemoryStore @@ -43,7 +44,9 @@ import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + override protected def sparkConf = { + super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L).set(ASYNC_TRACKING_ENABLED, false) + } private def createTestDataFrame: DataFrame = { Seq( @@ -107,10 +110,12 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest private def sqlStoreTest(name: String) (fn: (SQLAppStatusStore, SparkListenerBus) => Unit): Unit = { test(name) { - val store = new InMemoryStore() + val conf = sparkConf + val store = new ElementTrackingStore(new InMemoryStore(), conf) val bus = new ReplayListenerBus() - val listener = new SQLAppStatusListener(sparkConf, store, true) + val listener = new SQLAppStatusListener(conf, store, true) bus.addListener(listener) + store.close(false) val sqlStore = new SQLAppStatusStore(store, Some(listener)) fn(sqlStore, bus) } @@ -491,15 +496,15 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe class SQLListenerMemoryLeakSuite extends SparkFunSuite { - // TODO: this feature is not yet available in SQLAppStatusStore. - ignore("no memory leak") { - quietly { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly - .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly - withSpark(new SparkContext(conf)) { sc => + test("no memory leak") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly + .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly + .set(ASYNC_TRACKING_ENABLED, false) + withSpark(new SparkContext(conf)) { sc => + quietly { val spark = new SparkSession(sc) import spark.implicits._ // Run 100 successful executions and 100 failed executions. From e09a376edc6707e564e9f89a202c5347f37ad738 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 16 Nov 2017 21:22:53 -0800 Subject: [PATCH 2/5] Indent. --- .../scala/org/apache/spark/status/ElementTrackingStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 282ed35d9b528..1d808513088be 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -111,7 +111,7 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten doAsync { val count = store.count(value.getClass()) list.foreach { t => - if (count > t.threshold) { + if (count > t.threshold) { t.action(count) } } From 3f7c25d00a8c38eb3a1e0c1b28342e2c2f13cb4b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 29 Nov 2017 14:51:17 -0800 Subject: [PATCH 3/5] Add comment. --- core/src/main/scala/org/apache/spark/status/LiveEntity.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 6534d32b7c523..39f9b0bfa7338 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -42,6 +42,8 @@ private[spark] abstract class LiveEntity { var lastWriteTime = 0L def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { + // Always check triggers on the first write, since adding an element to the store may + // cause the maximum count for the element type to be exceeded. store.write(doUpdate(), checkTriggers || lastWriteTime == 0L) lastWriteTime = now } From b02ea2c4a96e27918c02f75c19d6639daae7cb2d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Dec 2017 15:31:09 -0800 Subject: [PATCH 4/5] Feedback. --- .../deploy/history/FsHistoryProvider.scala | 6 ++-- .../spark/status/AppStatusListener.scala | 19 +++++++--- .../spark/status/ElementTrackingStore.scala | 36 ++++++++----------- .../org/apache/spark/status/LiveEntity.scala | 4 +-- .../spark/status/AppStatusListenerSuite.scala | 28 +++++++++------ 5 files changed, 50 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 37a2229fa21fe..1d0e2e5055586 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -320,11 +320,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } val trackingStore = new ElementTrackingStore(kvstore, conf) - val listener = if (needReplay) { + if (needReplay) { val replayBus = new ReplayListenerBus() - val _listener = new AppStatusListener(trackingStore, conf, false, + val listener = new AppStatusListener(trackingStore, conf, false, lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) - replayBus.addListener(_listener) + replayBus.addListener(listener) AppStatusPlugin.loadPlugins().foreach { plugin => plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index b0864cf79e855..1fb7b76d43d04 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -898,19 +898,28 @@ private[spark] class AppStatusListener( } private def cleanupTasks(stage: LiveStage): Unit = { - val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage) - if (countToDelete > 0L) { + val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt + if (countToDelete > 0) { val stageKey = Array(stage.info.stageId, stage.info.attemptId) val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) .last(stageKey) - // On live applications, try to delete finished tasks only; when in the SHS, treat all - // tasks as the same. - val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t => + // Try to delete finished tasks only. + val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => !live || t.info.status != TaskState.RUNNING.toString() } toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) } stage.savedTasks.addAndGet(-toDelete.size) + + // If there are more running tasks than the configured limit, delete running tasks. This + // should be extremely rare since the limit should generally far exceed the number of tasks + // that can run in parallel. + val remaining = countToDelete - toDelete.size + if (remaining > 0) { + val runningTasksToDelete = view.max(remaining).iterator().asScala.toList + runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) } + stage.savedTasks.addAndGet(-remaining) + } } stage.cleaning = false } diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 1d808513088be..863b0967f765e 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.{HashMap, ListBuffer} +import com.google.common.util.concurrent.MoreExecutors + import org.apache.spark.SparkConf import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -38,8 +40,8 @@ import org.apache.spark.util.kvstore._ * - a generic flush mechanism so that listeners can be notified about when they should flush * internal state to the store (e.g. after the SHS finishes parsing an event log). * - * The configured triggers are run on the same thread that triggered the write, after the write - * has completed. + * The configured triggers are run on a separate thread by default; they can be forced to run on + * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { @@ -48,9 +50,9 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() private val flushTriggers = new ListBuffer[() => Unit]() private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { - Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") } else { - None + MoreExecutors.sameThreadExecutor() } @volatile private var stopped = false @@ -59,9 +61,6 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten * Register a trigger that will be fired once the number of elements of a given type reaches * the given threshold. * - * Triggers are fired in a separate thread, so that they can do more expensive operations - * than would be allowed on the main threads populating the store. - * * @param klass The type to monitor. * @param threshold The number of elements that should trigger the action. * @param action Action to run when the threshold is reached; takes as a parameter the number @@ -84,18 +83,13 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } /** - * Enqueues an action to be executed asynchronously. + * Enqueues an action to be executed asynchronously. The task will run on the calling thread if + * `ASYNC_TRACKING_ENABLED` is `false`. */ def doAsync(fn: => Unit): Unit = { - executor match { - case Some(exec) => - exec.submit(new Runnable() { - override def run(): Unit = Utils.tryLog { fn } - }) - - case _ => - fn - } + executor.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } + }) } override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) @@ -145,11 +139,9 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } stopped = true - executor.foreach { exec => - exec.shutdown() - if (!exec.awaitTermination(5, TimeUnit.SECONDS)) { - exec.shutdownNow() - } + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() } flushTriggers.foreach { trigger => diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 39f9b0bfa7338..52e83f250d34e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -39,12 +39,12 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] abstract class LiveEntity { - var lastWriteTime = 0L + var lastWriteTime = -1L def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { // Always check triggers on the first write, since adding an element to the store may // cause the maximum count for the element type to be exceeded. - store.write(doUpdate(), checkTriggers || lastWriteTime == 0L) + store.write(doUpdate(), checkTriggers || lastWriteTime == -1L) lastWriteTime = now } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 9c08a0ae1d951..9cf4f7efb24a8 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -864,8 +864,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { store.read(classOf[JobDataWrapper], 2) } - // Start 3 stages, all should be kept. Stop 2 of them, the oldest stopped one should be - // deleted. Start a new attempt of the second stopped one, and verify that the stage graph + // Start 3 stages, all should be kept. Stop 2 of them, the stopped one with the lowest id should + // be deleted. Start a new attempt of the second stopped one, and verify that the stage graph // data is not deleted. time += 1 val stages = Seq( @@ -912,25 +912,31 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } store.read(classOf[StageDataWrapper], Array(3, 1)) - // Start 3 tasks and stop two of them. The oldest should be deleted. + // Start 2 tasks. Finish the second one. time += 1 - val tasks = createTasks(3, Array("1")) + val tasks = createTasks(2, Array("1")) tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) } - assert(store.count(classOf[TaskDataWrapper]) === 3) + assert(store.count(classOf[TaskDataWrapper]) === 2) - tasks.drop(1).foreach { task => - time += 1 - task.markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(attempt2.stageId, attempt2.attemptId, - "taskType", TaskResultLost, task, null)) + // Start a 3rd task. The finished tasks should be deleted. + createTasks(1, Array("1")).foreach { task => + listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) } assert(store.count(classOf[TaskDataWrapper]) === 2) intercept[NoSuchElementException] { - store.read(classOf[TaskDataWrapper], tasks.drop(1).head.id) + store.read(classOf[TaskDataWrapper], tasks.last.id) } + // Start a 4th task. The first task should be deleted, even if it's still running. + createTasks(1, Array("1")).foreach { task => + listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) + } + assert(store.count(classOf[TaskDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[TaskDataWrapper], tasks.head.id) + } } private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) From d384ff44380f940a611307881c6f5dca19e526e7 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 14 Dec 2017 13:39:03 -0800 Subject: [PATCH 5/5] Add comment. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 1d0e2e5055586..fa2c5194aa41b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -305,6 +305,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val (kvstore, needReplay) = uiStorePath match { case Some(path) => try { + // The store path is not guaranteed to exist - maybe it hasn't been created, or was + // invalidated because changes to the event log were detected. Need to replay in that + // case. val _replay = !path.isDirectory() (createDiskStore(path, conf), _replay) } catch {