Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
Expand Down Expand Up @@ -304,6 +305,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
Expand All @@ -318,24 +322,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true)
Copy link
Contributor

@squito squito Dec 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this change, comment is really for val _replay = !path.isDirectory() but github won't let me put comment there ...

I know we discussed this when you made this change, but still I was confused reading this bit of code on replay. Maybe could you just add a comment above that line like "the kvstore is deleted when we decide that the loaded data is stale -- see LoadedAppUI for a more extensive discussion of the lifecycle".

doesn't seem worth a separate jira / pr just for that comment

}

val trackingStore = new ElementTrackingStore(kvstore, conf)
if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
}
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.flush()
trackingStore.close(false)
} catch {
case e: Exception =>
try {
kvstore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,6 @@ package object config {
.stringConf
.createOptional

// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)

// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
Expand Down
188 changes: 176 additions & 12 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
import org.apache.spark.util.kvstore.KVStore

/**
* A Spark listener that writes application information to a data store. The types written to the
Expand All @@ -42,7 +41,7 @@ import org.apache.spark.util.kvstore.KVStore
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: KVStore,
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
Expand All @@ -51,13 +50,15 @@ private[spark] class AppStatusListener(

private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)

// Keep track of live entities, so that task metrics can be efficiently updated (without
Expand All @@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
// Keep the active executor count as a separate variable to avoid having to do synchronization
// around liveExecutors.
@volatile private var activeExecutorCount = 0

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case _ =>
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }

kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
cleanupJobs(count)
}

kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
cleanupStages(count)
}

kvstore.onFlush {
if (!live) {
flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, why only flush for history server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the store is only closed on live applications when the context is shut down. So there's no more UI for you to see this.

}
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
Expand All @@ -97,6 +113,7 @@ private[spark] class AppStatusListener(
Seq(attempt))

kvstore.write(new ApplicationInfoWrapper(appInfo))
kvstore.write(appSummary)
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
Expand Down Expand Up @@ -158,10 +175,11 @@ private[spark] class AppStatusListener(
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
val now = System.nanoTime()
activeExecutorCount = math.max(0, activeExecutorCount - 1)
exec.isActive = false
exec.removeTime = new Date(event.time)
exec.removeReason = event.reason
update(exec, now)
update(exec, now, last = true)

// Remove all RDD distributions that reference the removed executor, in case there wasn't
// a corresponding event.
Expand Down Expand Up @@ -290,8 +308,11 @@ private[spark] class AppStatusListener(
}

job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now)
update(job, now, last = true)
}

appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
kvstore.write(appSummary)
}

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
Expand Down Expand Up @@ -350,6 +371,13 @@ private[spark] class AppStatusListener(
job.activeTasks += 1
maybeUpdate(job, now)
}

if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand Down Expand Up @@ -449,6 +477,13 @@ private[spark] class AppStatusListener(
esummary.metrics.update(metricsDelta)
}
maybeUpdate(esummary, now)

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand Down Expand Up @@ -516,8 +551,11 @@ private[spark] class AppStatusListener(
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now)
update(stage, now, last = true)
}

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}

override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
Expand Down Expand Up @@ -573,7 +611,7 @@ private[spark] class AppStatusListener(
}

/** Flush all live entities' data to the underlying store. */
def flush(): Unit = {
private def flush(): Unit = {
val now = System.nanoTime()
liveStages.values.asScala.foreach { stage =>
update(stage, now)
Expand Down Expand Up @@ -708,7 +746,10 @@ private[spark] class AppStatusListener(
}

private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime))
liveExecutors.getOrElseUpdate(executorId, {
activeExecutorCount += 1
new LiveExecutor(executorId, addTime)
})
}

private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
Expand Down Expand Up @@ -754,8 +795,8 @@ private[spark] class AppStatusListener(
}
}

private def update(entity: LiveEntity, now: Long): Unit = {
entity.write(kvstore, now)
private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change last to isLast ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer how the current version reads on the call site, e.g.:

update(exec, now, last = true)

Also, Spark generally avoids Java-beans-style prefixes in Scala code (like "is" or "get").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe checkTriggers is better than last?

entity.write(kvstore, now, checkTriggers = last)
}

/** Update a live entity only if it hasn't been updated in the last configured period. */
Expand All @@ -772,4 +813,127 @@ private[spark] class AppStatusListener(
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
val dead = count - activeExecutorCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KVStore has this:

  /**
   * Returns the number of items of the given type which match the given indexed value.
   */
  long count(Class<?> type, String index, Object indexedValue) throws Exception;

so with an api change you could get the right number directly from the store. (though this conflicts with my other comment about not using kvstore.count() at all in the trigger, which I think is more important.)


if (dead > threshold) {
val countToDelete = calculateNumberToRemove(dead, threshold)
val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
.max(countToDelete).first(false).last(false).asScala.toSeq
toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
}
}

private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
return
}

val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}

private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
return
}

val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

stages.foreach { s =>
val key = s.id
kvstore.delete(s.getClass(), key)

val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
.index("stage")
.first(key)
.last(key)
.asScala
.toSeq
execSummaries.foreach { e =>
kvstore.delete(e.getClass(), e.id)
}

val tasks = kvstore.view(classOf[TaskDataWrapper])
.index("stage")
.first(key)
.last(key)
.asScala

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.info.taskId)
}

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
.index("stageId")
.first(s.stageId)
.last(s.stageId)
.closeableIterator()

val hasMoreAttempts = try {
remainingAttempts.asScala.exists { other =>
other.info.attemptId != s.info.attemptId
}
} finally {
remainingAttempts.close()
}

if (!hasMoreAttempts) {
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
}
}
}

private def cleanupTasks(stage: LiveStage): Unit = {
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptId)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)

// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
!live || t.info.status != TaskState.RUNNING.toString()
}
toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-toDelete.size)

// If there are more running tasks than the configured limit, delete running tasks. This
// should be extremely rare since the limit should generally far exceed the number of tasks
// that can run in parallel.
val remaining = countToDelete - toDelete.size
if (remaining > 0) {
val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-remaining)
}
}
stage.cleaning = false
}

/**
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
*/
private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
if (dataSize > retainedSize) {
math.max(retainedSize / 10L, dataSize - retainedSize)
} else {
0L
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] trait AppStatusPlugin {
*/
def setupListeners(
conf: SparkConf,
store: KVStore,
store: ElementTrackingStore,
addListenerFn: SparkListener => Unit,
live: Boolean): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ private[spark] class AppStatusStore(
store.read(classOf[PoolData], name)
}

def appSummary(): AppSummary = {
store.read(classOf[AppSummary], classOf[AppSummary].getName())
}

def close(): Unit = {
store.close()
}
Expand All @@ -347,7 +351,7 @@ private[spark] object AppStatusStore {
* @param addListenerFn Function to register a listener with a bus.
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore()
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach { p =>
Expand Down
Loading