diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7da0a9d2285b..860ba34f8b97 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -69,13 +69,14 @@ private[spark] class AppStatusListener( // Keep track of live entities, so that task metrics can be efficiently updated (without // causing too many writes to the underlying store, and other expensive operations). - private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() - private val liveJobs = new HashMap[Int, LiveJob]() - private val liveExecutors = new HashMap[String, LiveExecutor]() - private val deadExecutors = new HashMap[String, LiveExecutor]() - private val liveTasks = new HashMap[Long, LiveTask]() - private val liveRDDs = new HashMap[Int, LiveRDD]() - private val pools = new HashMap[String, SchedulerPool]() + // variables are visible for tests. + private[spark] val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() + private[spark] val liveJobs = new HashMap[Int, LiveJob]() + private[spark] val liveExecutors = new HashMap[String, LiveExecutor]() + private[spark] val deadExecutors = new HashMap[String, LiveExecutor]() + private[spark] val liveTasks = new HashMap[Long, LiveTask]() + private[spark] val liveRDDs = new HashMap[Int, LiveRDD]() + private[spark] val pools = new HashMap[String, SchedulerPool]() private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" // Keep the active executor count as a separate variable to avoid having to do synchronization @@ -103,6 +104,87 @@ private[spark] class AppStatusListener( } } + // visible for tests + private[spark] def recoverLiveEntities(): Unit = { + if (!live) { + kvstore.view(classOf[JobDataWrapper]) + .asScala.filter(_.info.status == JobExecutionStatus.RUNNING) + .map(_.toLiveJob).foreach(job => liveJobs.put(job.jobId, job)) + + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) + + kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(!_.info.isActive) + .map(_.toLiveExecutor).foreach(exec => deadExecutors.put(exec.executorId, exec)) + + kvstore.view(classOf[StageDataWrapper]).asScala + .filter { stageData => + stageData.info.status == v1.StageStatus.PENDING || + stageData.info.status == v1.StageStatus.ACTIVE || + (stageData.info.numActiveTasks > 0 && stageData.info.status != v1.StageStatus.SKIPPED) + }.map { stageData => + val stageId = stageData.info.stageId + val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq + stageData.toLiveStage(jobs) + }.foreach { stage => + val stageId = stage.info.stageId + val stageAttempt = stage.info.attemptNumber() + liveStages.put((stageId, stageAttempt), stage) + + kvstore.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage") + .first(Array(stageId, stageAttempt)) + .last(Array(stageId, stageAttempt)) + .asScala + .map(_.toLiveExecutorStageSummary) + .foreach { esummary => + stage.executorSummaries.put(esummary.executorId, esummary) + if (esummary.isBlacklisted) { + stage.blackListedExecutors += esummary.executorId + liveExecutors.get(esummary.executorId).foreach(_.isBlacklisted = true) + liveExecutors.get(esummary.executorId).foreach(_.blacklistedInStages += stageId) + } + } + + kvstore.view(classOf[TaskDataWrapper]) + .parent(Array(stageId, stageAttempt)) + .index(TaskIndexNames.STATUS) + .first(TaskState.RUNNING.toString) + .last(TaskState.RUNNING.toString) + .asScala + .map(_.toLiveTask) + .foreach { task => + liveTasks.put(task.info.taskId, task) + stage.activeTasksPerExecutor(task.info.executorId) += 1 + } + + stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) + } + + kvstore.view(classOf[RDDStorageInfoWrapper]).asScala + .foreach { rddWrapper => + val liveRdd = rddWrapper.toLiveRDD(liveExecutors) + liveRDDs.put(liveRdd.info.id, liveRdd) + } + + kvstore.view(classOf[PoolData]).asScala.foreach { poolData => + val schedulerPool = poolData.toSchedulerPool + pools.put(schedulerPool.name, schedulerPool) + } + } + } + + // used for tests only + private[spark] def clearLiveEntities(): Unit = { + liveStages.clear() + liveJobs.clear() + liveExecutors.clear() + deadExecutors.clear() + liveTasks.clear() + liveRDDs.clear() + pools.clear() + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(version) => sparkVersion = version case _ => @@ -877,6 +959,12 @@ private[spark] class AppStatusListener( } } + // used in tests only + private[spark] def flush(): Unit = { + val now = System.nanoTime() + flush(update(_, now)) + } + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 00c991b49920..38faa8a20ee6 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -59,14 +59,14 @@ private[spark] abstract class LiveEntity { } -private class LiveJob( +private[spark] class LiveJob( val jobId: Int, - name: String, + val name: String, val submissionTime: Option[Date], val stageIds: Seq[Int], - jobGroup: Option[String], - numTasks: Int, - sqlExecutionId: Option[Long]) extends LiveEntity { + val jobGroup: Option[String], + val numTasks: Int, + val sqlExecutionId: Option[Long]) extends LiveEntity { var activeTasks = 0 var completedTasks = 0 @@ -74,6 +74,8 @@ private class LiveJob( // Holds both the stage ID and the task index, packed into a single long value. val completedIndices = new OpenHashSet[Long]() + // will only be set when recover LiveJob is needed. + var numCompletedIndices = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -85,6 +87,8 @@ private class LiveJob( var completionTime: Option[Date] = None var completedStages: Set[Int] = Set() + // will only be set when recover LiveJob is needed. + var numCompletedStages = 0 var activeStages = 0 var failedStages = 0 @@ -104,9 +108,9 @@ private class LiveJob( skippedTasks, failedTasks, killedTasks, - completedIndices.size, + completedIndices.size + numCompletedIndices, activeStages, - completedStages.size, + completedStages.size + numCompletedStages, skippedStages.size, failedStages, killedSummary) @@ -115,7 +119,7 @@ private class LiveJob( } -private class LiveTask( +private[spark] class LiveTask( var info: TaskInfo, stageId: Int, stageAttemptId: Int, @@ -229,7 +233,7 @@ private class LiveTask( } -private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { +private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { var hostPort: String = null var host: String = null @@ -272,7 +276,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L // peak values for executor level metrics - val peakExecutorMetrics = new ExecutorMetrics() + var peakExecutorMetrics = new ExecutorMetrics() def hostname: String = if (host != null) host else hostPort.split(":")(0) @@ -316,10 +320,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE } } -private class LiveExecutorStageSummary( +private[spark] class LiveExecutorStageSummary( stageId: Int, attemptId: Int, - executorId: String) extends LiveEntity { + val executorId: String) extends LiveEntity { import LiveEntityHelpers._ @@ -353,7 +357,7 @@ private class LiveExecutorStageSummary( } -private class LiveStage extends LiveEntity { +private[spark] class LiveStage extends LiveEntity { import LiveEntityHelpers._ @@ -370,6 +374,8 @@ private class LiveStage extends LiveEntity { var completedTasks = 0 var failedTasks = 0 val completedIndices = new OpenHashSet[Int]() + // will only be set when recover LiveStage is needed. + var numCompletedIndices = 0 var killedTasks = 0 var killedSummary: Map[String, Int] = Map() @@ -405,7 +411,7 @@ private class LiveStage extends LiveEntity { numCompleteTasks = completedTasks, numFailedTasks = failedTasks, numKilledTasks = killedTasks, - numCompletedIndices = completedIndices.size, + numCompletedIndices = completedIndices.size + numCompletedIndices, submissionTime = info.submissionTime.map(new Date(_)), firstTaskLaunchedTime = @@ -464,7 +470,7 @@ private class LiveStage extends LiveEntity { * used by the partition in the executors, and thus may differ from the storage level requested * by the application. */ -private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { +private[spark] class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { import LiveEntityHelpers._ @@ -496,7 +502,7 @@ private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { } -private class LiveRDDDistribution(exec: LiveExecutor) { +private[spark] class LiveRDDDistribution(exec: LiveExecutor) { import LiveEntityHelpers._ @@ -513,6 +519,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { def toApi(): v1.RDDDataDistribution = { if (lastUpdate == null) { lastUpdate = new v1.RDDDataDistribution( + executorId, weakIntern(exec.hostPort), memoryUsed, exec.maxMemory - exec.memoryUsed, @@ -535,7 +542,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { * RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage * it started after the RDD is marked for caching. */ -private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { +private[spark] class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { import LiveEntityHelpers._ @@ -543,10 +550,10 @@ private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends Liv var diskUsed = 0L private val levelDescription = weakIntern(storageLevel.description) - private val partitions = new HashMap[String, LiveRDDPartition]() - private val partitionSeq = new RDDPartitionSeq() + private[spark] val partitions = new HashMap[String, LiveRDDPartition]() + private[spark] val partitionSeq = new RDDPartitionSeq() - private val distributions = new HashMap[String, LiveRDDDistribution]() + private[spark] val distributions = new HashMap[String, LiveRDDDistribution]() def partition(blockName: String): LiveRDDPartition = { partitions.getOrElseUpdate(blockName, { @@ -600,7 +607,7 @@ private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends Liv } -private class SchedulerPool(name: String) extends LiveEntity { +private[spark] class SchedulerPool(val name: String) extends LiveEntity { var stageIds = Set[Int]() @@ -750,7 +757,7 @@ private object LiveEntityHelpers { * Internally, the sequence is mutable, and elements can modify the data they expose. Additions and * removals are O(1). It is not safe to do multiple writes concurrently. */ -private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { +private[spark] class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { @volatile private var _head: LiveRDDPartition = null @volatile private var _tail: LiveRDDPartition = null diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5ec9b3639376..a69594f6d6b2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -31,6 +31,8 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.resource.ResourceInformation +import org.apache.spark.status.{LiveExecutor, LiveRDDDistribution, LiveRDDPartition} +import org.apache.spark.storage.StorageLevel case class ApplicationInfo private[spark]( id: String, @@ -181,6 +183,7 @@ class RDDStorageInfo private[spark]( val partitions: Option[Seq[RDDPartitionInfo]]) class RDDDataDistribution private[spark]( + val executorId: String, val address: String, val memoryUsed: Long, val memoryRemaining: Long, @@ -192,14 +195,35 @@ class RDDDataDistribution private[spark]( @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], @JsonDeserialize(contentAs = classOf[JLong]) - val offHeapMemoryRemaining: Option[Long]) + val offHeapMemoryRemaining: Option[Long]) { + + private[spark] def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) + : LiveRDDDistribution = { + val exec = executors.get(executorId).get + val liveRDDDistribution = new LiveRDDDistribution(exec) + liveRDDDistribution.memoryUsed = memoryUsed + liveRDDDistribution.diskUsed = diskUsed + liveRDDDistribution.onHeapUsed = onHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.offHeapUsed = offHeapMemoryUsed.getOrElse(0) + liveRDDDistribution.lastUpdate = this + liveRDDDistribution + } +} class RDDPartitionInfo private[spark]( val blockName: String, val storageLevel: String, val memoryUsed: Long, val diskUsed: Long, - val executors: Seq[String]) + val executors: Seq[String]) { + + def toLiveRDDPartition: LiveRDDPartition = { + val liveRDDPartition = new LiveRDDPartition(blockName, + StorageLevel.fromDescription(storageLevel)) + liveRDDPartition.value = this + liveRDDPartition + } +} class StageData private[spark]( val status: StageStatus, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9da5bea8bf5c..490c215702e5 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -23,8 +23,11 @@ import java.util.Date import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler.{StageInfo, TaskInfo, TaskLocality} import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ +import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.scope._ import org.apache.spark.util.kvstore.KVIndex @@ -59,6 +62,37 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { @JsonIgnore @KVIndex("host") val host: String = info.hostPort.split(":")(0) + def toLiveExecutor: LiveExecutor = { + val liveExecutor = new LiveExecutor(info.id, info.addTime.getTime) + liveExecutor.hostPort = info.hostPort + liveExecutor.host = info.hostPort.split(":")(0) + liveExecutor.isActive = info.isActive + liveExecutor.totalCores = info.totalCores + liveExecutor.rddBlocks = info.rddBlocks + liveExecutor.memoryUsed = info.memoryUsed + liveExecutor.diskUsed = info.diskUsed + liveExecutor.maxTasks = info.maxTasks + liveExecutor.maxMemory = info.maxMemory + liveExecutor.totalTasks = info.totalTasks + liveExecutor.activeTasks = info.activeTasks + liveExecutor.completedTasks = info.completedTasks + liveExecutor.failedTasks = info.failedTasks + liveExecutor.totalDuration = info.totalDuration + liveExecutor.totalGcTime = info.totalGCTime + liveExecutor.totalInputBytes = info.totalInputBytes + liveExecutor.totalShuffleRead = info.totalShuffleRead + liveExecutor.totalShuffleWrite = info.totalShuffleWrite + liveExecutor.isBlacklisted = info.isBlacklisted + liveExecutor.blacklistedInStages = info.blacklistedInStages + liveExecutor.executorLogs = info.executorLogs + liveExecutor.attributes = info.attributes + liveExecutor.totalOnHeap = info.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(-1) + liveExecutor.totalOffHeap = info.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0) + liveExecutor.usedOnHeap = info.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0) + liveExecutor.usedOffHeap = info.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0) + liveExecutor.peakExecutorMetrics = info.peakMemoryMetrics.getOrElse(new ExecutorMetrics()) + liveExecutor + } } /** @@ -76,6 +110,29 @@ private[spark] class JobDataWrapper( @JsonIgnore @KVIndex("completionTime") private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + + def toLiveJob: LiveJob = { + val liveJob = new LiveJob( + info.jobId, + info.name, + info.submissionTime, + info.stageIds, + info.jobGroup, + info.numTasks, + sqlExecutionId) + liveJob.activeTasks = info.numActiveTasks + liveJob.completedTasks = info.numCompletedTasks + liveJob.failedTasks = info.numFailedTasks + liveJob.numCompletedIndices = info.numCompletedIndices + liveJob.killedTasks = info.numKilledTasks + liveJob.killedSummary = info.killedTasksSummary + liveJob.skippedTasks = info.numSkippedTasks + liveJob.skippedStages = skippedStages + liveJob.numCompletedStages = info.numCompletedStages + liveJob.activeStages = info.numActiveStages + liveJob.failedStages = info.numFailedStages + liveJob + } } private[spark] class StageDataWrapper( @@ -95,6 +152,82 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex("completionTime") private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + + private def idAwareRDDInfos(rddIds: Seq[Int]): Seq[RDDInfo] = { + // It's safe to give arbitrary values except id while recovering RDDInfo, + // since a running LiveStage only concerns about rddInfo's id. + rddIds.map { id => + new RDDInfo(id, id.toString, 0, null, false, Nil) + } + } + + def toLiveStage(jobs: Seq[LiveJob]): LiveStage = { + val liveStage = new LiveStage + val firstLaunchTime = if (info.firstTaskLaunchedTime.isEmpty) { + Long.MaxValue + } else { + info.firstTaskLaunchedTime.get.getTime + } + val metrics = LiveEntityHelpers.createMetrics( + info.executorDeserializeTime, + info.executorDeserializeCpuTime, + info.executorRunTime, + info.executorCpuTime, + info.resultSize, + info.jvmGcTime, + info.resultSerializationTime, + info.memoryBytesSpilled, + info.diskBytesSpilled, + info.peakExecutionMemory, + info.inputBytes, + info.inputRecords, + info.outputBytes, + info.outputRecords, + info.shuffleRemoteBlocksFetched, + info.shuffleLocalBlocksFetched, + info.shuffleFetchWaitTime, + info.shuffleRemoteBytesRead, + info.shuffleRemoteBytesReadToDisk, + info.shuffleLocalBytesRead, + info.shuffleReadRecords, + info.shuffleWriteBytes, + info.shuffleWriteTime, + info.shuffleWriteRecords + ) + + // parentIds, taskMetrics, taskLocalityPreferences and shuffleDepId aren't assigned here + // but it's also OK since a running LiveStage don't visit these attributes. And we'll + // get a complete StageInfo again when we receive SparkListenerStageCompleted event. + val stageInfo = new StageInfo( + info.stageId, + info.attemptId, + info.name, + info.numTasks, + idAwareRDDInfos(info.rddIds), + Nil, // parentIds + info.details) + stageInfo.submissionTime = info.submissionTime.map(_.getTime) + + // Note that attributes for `executorSummaries`, `activeTasksPerExecutor`, + // `blackListedExecutors`, `savedTasks` are computed later in + // AppStatusListener.recoverLiveEntities(). + liveStage.jobs = jobs + liveStage.jobIds = jobs.map(_.jobId).toSet + liveStage.info = stageInfo + liveStage.status = info.status + liveStage.description = info.description + liveStage.schedulingPool = info.schedulingPool + liveStage.activeTasks = info.numActiveTasks + liveStage.completedTasks = info.numCompleteTasks + liveStage.failedTasks = info.numFailedTasks + liveStage.numCompletedIndices = info.numCompletedIndices + liveStage.killedTasks = info.numKilledTasks + liveStage.killedSummary = info.killedTasksSummary + liveStage.firstLaunchTime = firstLaunchTime + liveStage.localitySummary = locality + liveStage.metrics = metrics + liveStage + } } /** @@ -290,6 +423,23 @@ private[spark] class TaskDataWrapper( gettingResultTime = 0L) } + def toLiveTask: LiveTask = { + val taskInfo = + new TaskInfo( + taskId, + index, + attempt, + launchTime, + executorId, + host, + TaskLocality.withName(taskLocality), + speculative) + taskInfo.gettingResultTime = gettingResultTime + val lastUpdateTime = duration + launchTime + val liveTask = new LiveTask(taskInfo, stageId, stageAttemptId, Some(lastUpdateTime)) + liveTask + } + @JsonIgnore @KVIndex(TaskIndexNames.STAGE) private def stage: Array[Int] = Array(stageId, stageAttemptId) @@ -360,6 +510,32 @@ private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { @JsonIgnore @KVIndex("cached") def cached: Boolean = info.numCachedPartitions > 0 + def toLiveRDD(executors: scala.collection.Map[String, LiveExecutor]): LiveRDD = { + val storageLevel = StorageLevel.fromDescription(info.storageLevel) + val rddInfo = new RDDInfo( + info.id, + info.name, + info.numPartitions, + storageLevel, + false, + Nil) + val liveRDD = new LiveRDD(rddInfo, storageLevel) + liveRDD.memoryUsed = info.memoryUsed + liveRDD.diskUsed = info.diskUsed + info.partitions.get.foreach { rddPartition => + val liveRDDPartition = rddPartition.toLiveRDDPartition + liveRDD.partitions.put(rddPartition.blockName, liveRDDPartition) + liveRDD.partitionSeq.addPartition(liveRDDPartition) + } + if (info.dataDistribution.nonEmpty) { + info.dataDistribution.get.foreach { rddDist => + val liveRDDDist = rddDist.toLiveRDDDistribution(executors) + liveRDD.distributions.put(liveRDDDist.executorId, liveRDDDist) + } + } + liveRDD + } + } private[spark] class ExecutorStageSummaryWrapper( @@ -377,6 +553,41 @@ private[spark] class ExecutorStageSummaryWrapper( @JsonIgnore def id: Array[Any] = _id + def toLiveExecutorStageSummary: LiveExecutorStageSummary = { + val liveESSummary = new LiveExecutorStageSummary(stageId, stageAttemptId, executorId) + val metrics = LiveEntityHelpers.createMetrics( + executorDeserializeTime = 0, + executorDeserializeCpuTime = 0, + executorRunTime = 0, + executorCpuTime = 0, + resultSize = 0, + jvmGcTime = 0, + resultSerializationTime = 0, + memoryBytesSpilled = info.memoryBytesSpilled, + diskBytesSpilled = info.diskBytesSpilled, + peakExecutionMemory = 0, + inputBytesRead = info.inputBytes, + inputRecordsRead = info.inputRecords, + outputBytesWritten = info.outputBytes, + outputRecordsWritten = info.outputRecords, + shuffleRemoteBlocksFetched = 0, + shuffleLocalBlocksFetched = 0, + shuffleFetchWaitTime = 0, + shuffleRemoteBytesRead = info.shuffleRead, + shuffleRemoteBytesReadToDisk = 0, + shuffleLocalBytesRead = 0, + shuffleRecordsRead = info.shuffleReadRecords, + shuffleBytesWritten = info.shuffleWrite, + shuffleWriteTime = 0, + shuffleRecordsWritten = info.shuffleWriteRecords) + liveESSummary.taskTime = info.taskTime + liveESSummary.succeededTasks = info.succeededTasks + liveESSummary.failedTasks = info.failedTasks + liveESSummary.isBlacklisted = info.isBlacklistedForStage + liveESSummary.metrics = metrics + liveESSummary + } + } private[spark] class StreamBlockData( @@ -429,7 +640,14 @@ private[spark] class RDDOperationGraphWrapper( private[spark] class PoolData( @KVIndexParam val name: String, - val stageIds: Set[Int]) + val stageIds: Set[Int]) { + + def toSchedulerPool: SchedulerPool = { + val pool = new SchedulerPool(name) + pool.stageIds = stageIds + pool + } +} /** * A class with information about an app, to be used by the UI. There's only one instance of diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4c6998d7a8e2..2ab3128b0428 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -163,6 +163,20 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) + /** + * :: DeveloperApi :: + * Return the StorageLevel object with the specified description. + */ + @DeveloperApi + def fromDescription(desc: String): StorageLevel = { + val (useDisk_, useMemory_, useOffHeap_, deserialized_) = { + (desc.contains("Disk"), desc.contains("Memory"), + desc.contains("off heap"), desc.contains("Deserialized")) + } + val replication_ = desc.split(" ").takeRight(2)(0).dropRight(1).toInt + new StorageLevel(useDisk_, useMemory_, useOffHeap_, deserialized_, replication_) + } + /** * :: DeveloperApi :: * Return the StorageLevel object with the specified name. diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 6bf163506e0c..081996e9c77a 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -23,6 +23,7 @@ import java.util.{Date, Properties} import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.reflect.{classTag, ClassTag} +import scala.util.Random import org.scalatest.BeforeAndAfter @@ -35,6 +36,7 @@ import org.apache.spark.scheduler.cluster._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.InMemoryStore class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { @@ -1656,6 +1658,337 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("recover live entities from KVStore") { + def assertListenerEquals(live: AppStatusListener, nonLive: AppStatusListener) + : Unit = { + // ensures all live entities are wrote into KVStore + live.flush() + nonLive.clearLiveEntities() + nonLive.recoverLiveEntities() + assertLiveEntityEquals(live, nonLive) + } + + val kvstore = new ElementTrackingStore(new InMemoryStore, conf) + val liveListener = new AppStatusListener(kvstore, conf, live = true) + val nonLiveListener = new AppStatusListener(kvstore, conf, live = false) + var time = 1L + liveListener.onApplicationStart(SparkListenerApplicationStart( + "test", Some("appId"), time, "spark", Some("appId-attempt"))) + time += 1 + + val exec0 = createExecutorAddedEvent(0) + val exec1 = createExecutorAddedEvent(1) + liveListener.onExecutorAdded(exec0) + liveListener.onExecutorAdded(exec1) + assert(liveListener.liveExecutors.size === 2) + // hostPort is needed in LiveRDDDistribution + liveListener.liveExecutors.get("0").get.hostPort = exec0.executorInfo.executorHost + liveListener.liveExecutors.get("1").get.hostPort = exec1.executorInfo.executorHost + assertListenerEquals(liveListener, nonLiveListener) + + val level = StorageLevel.MEMORY_AND_DISK + val rddInfo0 = new RDDInfo(0, "rdd-0", 1, level, false, Nil) + val rddInfo1 = new RDDInfo(1, "rdd-1", 1, level, false, Nil) + val stage0 = createStageInfo(stageId = 0, attemptId = 0, 2, Seq(rddInfo0)) + val stage1 = createStageInfo(stageId = 1, attemptId = 0, 2, Seq(rddInfo1)) + val jobId = 0 + liveListener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage0, stage1))) + assert(liveListener.liveJobs.size === 1) + assert(liveListener.liveStages.size === 2) + assertListenerEquals(liveListener, nonLiveListener) + time +=1 + + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage0)) + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage1)) + assert(liveListener.liveRDDs.size === 2) + assertListenerEquals(liveListener, nonLiveListener) + time +=1 + + liveListener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, "0", 0)) + assert(liveListener.liveExecutors.get("0").get.isBlacklisted) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + liveListener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "0")) + assert(!liveListener.liveExecutors.get("0").get.isBlacklisted) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + val tasks = createTasks(4, Array("0", "1")) + // update some metrics for stages in order to validate metrics equation below + liveListener.liveStages.values().asScala.foreach { stage => + stage.metrics = createRandomV1TaskMetrics() + } + Seq(stage0, stage1).foreach { stage => + liveListener.onTaskStart(SparkListenerTaskStart( + stage.stageId, stage.attemptNumber(), tasks(stage.stageId * 2))) + liveListener.onTaskStart(SparkListenerTaskStart( + stage.stageId, stage.attemptNumber(), tasks(stage.stageId * 2 + 1))) + } + assert(liveListener.liveTasks.size === 4) + assertListenerEquals(liveListener, nonLiveListener) + time +=1 + + val bm0 = BlockManagerId("0", exec0.executorInfo.executorHost, 1234) + val rdd0 = RddBlock(0, 0, 1L, 2L) + val bm1 = BlockManagerId("1", exec1.executorInfo.executorHost, 4321) + val rdd1 = RddBlock(1, 0, 3L, 4L) + liveListener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm0, rdd0.blockId, level, rdd0.memSize, rdd0.diskSize))) + liveListener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rdd1.blockId, level, rdd1.memSize, rdd1.diskSize))) + assertListenerEquals(liveListener, nonLiveListener) + + liveListener.onUnpersistRDD(SparkListenerUnpersistRDD(rddInfo0.id)) + assert(liveListener.liveRDDs.size === 1) + assertListenerEquals(liveListener, nonLiveListener) + liveListener.onUnpersistRDD(SparkListenerUnpersistRDD(rddInfo1.id)) + assert(liveListener.liveRDDs.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + + val executorMetrics = new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, + 30L, 100L, 55L, 70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L)) + // finish task 0 and task 2 in stages + val task0 = tasks(stage0.stageId * 2) + val task2 = tasks(stage1.stageId * 2) + task0.finishTime = time + task2.finishTime = time + liveListener.onTaskEnd(SparkListenerTaskEnd(stage0.stageId, stage0.attemptNumber(), "task 0", + Success, task0, executorMetrics, createRandomTaskMetrics())) + liveListener.onTaskEnd(SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber(), "task 2", + Success, task2, executorMetrics, createRandomTaskMetrics())) + assert(liveListener.liveTasks.size === 2) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + // finish task 1 and task 3 in stages + val task1 = tasks(stage0.stageId * 2 + 1) + val task3 = tasks(stage1.stageId * 2 + 1) + task1.finishTime = time + task3.finishTime = time + liveListener.onTaskEnd(SparkListenerTaskEnd(stage0.stageId, stage0.attemptNumber(), "task 1", + Success, task1, executorMetrics, createRandomTaskMetrics())) + liveListener.onTaskEnd(SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber(), "task 3", + Success, task3, executorMetrics, createRandomTaskMetrics())) + assert(liveListener.liveTasks.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + + liveListener.onStageCompleted(SparkListenerStageCompleted(stage0)) + liveListener.onStageCompleted(SparkListenerStageCompleted(stage1)) + assert(liveListener.liveStages.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onJobEnd(SparkListenerJobEnd(jobId, time, JobSucceeded)) + assert(liveListener.liveJobs.size === 0) + assertListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onApplicationEnd(SparkListenerApplicationEnd(time)) + assertListenerEquals(liveListener, nonLiveListener) + } + + private def assertLiveEntityEquals(src: AppStatusListener, dest: AppStatusListener) + : Unit = { + def assertLiveJobEquals(sJob: LiveJob, dJob: LiveJob): Unit = { + assert(sJob.jobId === dJob.jobId) + assert(sJob.name === dJob.name) + assert(sJob.submissionTime === dJob.submissionTime) + assert(sJob.stageIds === dJob.stageIds) + assert(sJob.jobGroup == dJob.jobGroup) + assert(sJob.numTasks == dJob.numTasks) + assert(sJob.sqlExecutionId == dJob.sqlExecutionId) + } + + def assertStageInfoEquals(sSInfo: StageInfo, dSInfo: StageInfo): Unit = { + assert(sSInfo.stageId === dSInfo.stageId) + assert(sSInfo.attemptNumber() === dSInfo.attemptNumber()) + assert(sSInfo.name === dSInfo.name) + assert(sSInfo.numTasks === dSInfo.numTasks) + assert(sSInfo.details === dSInfo.details) + } + + def assertTaskMetricsEquals( + sTM: v1.TaskMetrics, + dTM: v1.TaskMetrics): Unit = { + assert(sTM.executorDeserializeTime === dTM.executorDeserializeTime) + assert(sTM.executorDeserializeCpuTime === dTM.executorDeserializeCpuTime) + assert(sTM.executorRunTime === dTM.executorRunTime) + assert(sTM.executorCpuTime === dTM.executorCpuTime) + assert(sTM.resultSize === dTM.resultSize) + assert(sTM.jvmGcTime === dTM.jvmGcTime) + assert(sTM.resultSerializationTime === dTM.resultSerializationTime) + assert(sTM.memoryBytesSpilled === dTM.memoryBytesSpilled) + assert(sTM.diskBytesSpilled === dTM.diskBytesSpilled) + assert(sTM.peakExecutionMemory === dTM.peakExecutionMemory) + + val sIM = sTM.inputMetrics + val dIM = dTM.inputMetrics + assert(sIM.bytesRead === dIM.bytesRead) + assert(sIM.recordsRead === dIM.recordsRead) + + val sOM = sTM.outputMetrics + val dOM = dTM.outputMetrics + assert(sOM.bytesWritten === dOM.bytesWritten) + assert(sOM.recordsWritten === dOM.recordsWritten) + + val sRM = sTM.shuffleReadMetrics + val dRM = dTM.shuffleReadMetrics + assert(sRM.remoteBlocksFetched === dRM.remoteBlocksFetched) + assert(sRM.localBlocksFetched === dRM.localBlocksFetched) + assert(sRM.fetchWaitTime === dRM.fetchWaitTime) + assert(sRM.remoteBytesRead === dRM.remoteBytesRead) + assert(sRM.remoteBytesReadToDisk === dRM.remoteBytesReadToDisk) + assert(sRM.localBytesRead === dRM.localBytesRead) + assert(sRM.recordsRead === dRM.recordsRead) + + val sWM = sTM.shuffleWriteMetrics + val dWM = sTM.shuffleWriteMetrics + assert(sWM.bytesWritten === dWM.bytesWritten) + assert(sWM.writeTime === dWM.writeTime) + assert(sWM.recordsWritten === dWM.recordsWritten) + } + + def assertTaskInfoEquals(sTInfo: TaskInfo, dTInfo: TaskInfo): Unit = { + assert(sTInfo.taskId === dTInfo.taskId) + assert(sTInfo.index === dTInfo.index) + assert(sTInfo.attemptNumber === dTInfo.attemptNumber) + assert(sTInfo.launchTime === dTInfo.launchTime) + assert(sTInfo.executorId === dTInfo.executorId) + assert(sTInfo.host === dTInfo.host) + assert(sTInfo.taskLocality === dTInfo.taskLocality) + assert(sTInfo.speculative === dTInfo.speculative) + } + val srcExecutors = src.liveExecutors + val destExecutors = dest.liveExecutors + assert(srcExecutors.size === destExecutors.size) + srcExecutors.keys.foreach { execId => + val sExec = srcExecutors.get(execId).get + val dExec = destExecutors.get(execId).get + assert(sExec.addTime === dExec.addTime) + assert(sExec.host === dExec.host) + assert(sExec.hostPort === dExec.hostPort) + assert(sExec.totalCores === dExec.totalCores) + assert(sExec.rddBlocks === dExec.rddBlocks) + assert(sExec.memoryUsed === dExec.memoryUsed) + assert(sExec.diskUsed === dExec.diskUsed) + assert(sExec.maxTasks === dExec.maxTasks) + assert(sExec.maxMemory === dExec.maxMemory) + assert(sExec.totalTasks === dExec.totalTasks) + assert(sExec.activeTasks === dExec.activeTasks) + assert(sExec.completedTasks === dExec.completedTasks) + assert(sExec.failedTasks === dExec.failedTasks) + assert(sExec.totalDuration === dExec.totalDuration) + assert(sExec.totalGcTime === dExec.totalGcTime) + assert(sExec.totalInputBytes === dExec.totalInputBytes) + assert(sExec.totalShuffleRead === dExec.totalShuffleRead) + assert(sExec.totalShuffleWrite === dExec.totalShuffleWrite) + assert(sExec.isBlacklisted === dExec.isBlacklisted) + assert(sExec.blacklistedInStages === dExec.blacklistedInStages) + // return false indicates that there're no updates between these two metrics + assert(!sExec.peakExecutorMetrics.compareAndUpdatePeakValues(dExec.peakExecutorMetrics)) + } + + val srcJobs = src.liveJobs + val destJobs = dest.liveJobs + assert(srcJobs.size === destJobs.size) + srcJobs.keys.foreach { jobId => + val sJob = srcJobs.get(jobId).get + val dJob = destJobs.get(jobId).get + assertLiveJobEquals(sJob, dJob) + } + val srcStages = src.liveStages + val destStages = dest.liveStages + assert(srcStages.size() === destStages.size()) + srcStages.keys().asScala.foreach { stageId => + val sStage = srcStages.get(stageId) + val dStage = destStages.get(stageId) + val sStageJobs = sStage.jobs.sortBy(_.jobId) + val dStageJobs = dStage.jobs.sortBy(_.jobId) + assert(sStageJobs.size === dStageJobs.size) + sStageJobs.zip(dStageJobs).foreach {case (sJob, dJob) => + assertLiveJobEquals(sJob, dJob) } + assert(sStage.jobIds.size === dStage.jobs.size) + assert(sStage.jobIds === dStage.jobIds) + assertStageInfoEquals(sStage.info, dStage.info) + assert(sStage.status === dStage.status) + assert(sStage.description === dStage.description) + assert(sStage.schedulingPool === dStage.schedulingPool) + assert(sStage.activeTasks === dStage.activeTasks) + assert(sStage.completedTasks === dStage.completedTasks) + assert(sStage.completedIndices.size === dStage.numCompletedIndices) + assert(sStage.killedTasks === dStage.killedTasks) + assert(sStage.killedSummary === dStage.killedSummary) + assert(sStage.firstLaunchTime === dStage.firstLaunchTime) + assert(sStage.localitySummary === dStage.localitySummary) + assertTaskMetricsEquals(sStage.metrics, dStage.metrics) + val sSummaries = sStage.executorSummaries + val dSummaries = dStage.executorSummaries + assert(sSummaries.size === dSummaries.size) + sSummaries.keys.foreach { execId => + val sSummary = sSummaries.get(execId).get + val dSummary = dSummaries.get(execId).get + assert(sSummary.executorId === dSummary.executorId) + assert(sSummary.taskTime === dSummary.taskTime) + assert(sSummary.succeededTasks === dSummary.succeededTasks) + assert(sSummary.failedTasks === dSummary.failedTasks) + assert(sSummary.killedTasks === dSummary.killedTasks) + assert(sSummary.isBlacklisted === dSummary.isBlacklisted) + } + // we only compare executors with active tasks to those recovered executors, + // because executors with non active tasks wouldn't be recovered. + assert(sStage.activeTasksPerExecutor.filter(_._2 > 0) === dStage.activeTasksPerExecutor) + assert(sStage.blackListedExecutors === dStage.blackListedExecutors) + } + val srcTasks = src.liveTasks + val destTasks = dest.liveTasks + assert(srcTasks.size === destTasks.size) + srcTasks.keys.foreach { taskId => + val sTask = srcTasks.get(taskId).get + val dTask = destTasks.get(taskId).get + assertTaskInfoEquals(sTask.info, dTask.info) + } + val srcRDDs = src.liveRDDs + val destRDDs = dest.liveRDDs + assert(srcRDDs.size === destRDDs.size) + srcRDDs.keys.foreach { rddId => + val sRDD = srcRDDs.get(rddId).get + val dRDD = destRDDs.get(rddId).get + assert(sRDD.info.id === dRDD.info.id) + assert(sRDD.info.name === dRDD.info.name) + assert(sRDD.info.numPartitions === dRDD.info.numPartitions) + assert(sRDD.info.storageLevel === dRDD.info.storageLevel) + assert(sRDD.memoryUsed === dRDD.memoryUsed) + assert(sRDD.diskUsed === dRDD.diskUsed) + val sRDDPartitions = sRDD.partitions + val dRDDPartitions = dRDD.partitions + assert(sRDDPartitions.size === dRDDPartitions.size) + sRDDPartitions.keys.foreach { block => + val sPartition = sRDDPartitions.get(block).get + val dPartition = dRDDPartitions.get(block).get + assert(sPartition.executors === dPartition.executors) + assert(sPartition.memoryUsed === dPartition.memoryUsed) + assert(sPartition.diskUsed === dPartition.diskUsed) + } + val sRDDDists = sRDD.distributions + val dRDDDists = dRDD.distributions + assert(sRDDDists.size === dRDDDists.size) + sRDDDists.keys.foreach { execId => + val sDist = sRDDDists.get(execId).get + val dDist = dRDDDists.get(execId).get + assert(sDist.executorId === dDist.executorId) + assert(sDist.memoryUsed === dDist.memoryUsed) + assert(sDist.diskUsed === dDist.diskUsed) + } + } + val srcPools = src.pools + val destPools = dest.pools + srcPools.keys.foreach { name => + val sPool = srcPools.get(name).get + val dPool = destPools.get(name).get + assert(sPool.name === dPool.name) + assert(sPool.stageIds === dPool.stageIds) + } + } private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber) @@ -1729,4 +2062,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) } + + private def createStageInfo(stageId: Int, attemptId: Int, numTasks: Int, rddInfos: Seq[RDDInfo]) + : StageInfo = { + new StageInfo(stageId = stageId, + attemptId = attemptId, + name = s"stage-$stageId-$attemptId", + numTasks = numTasks, + rddInfos = rddInfos, + parentIds = Nil, + details = s"stage-$stageId-$attemptId") + } + + private def createRandomV1TaskMetrics(): v1.TaskMetrics = { + val rnd = new Random(System.nanoTime()) + LiveEntityHelpers.createMetrics( + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), + rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong() + ) + } + + private def createRandomTaskMetrics(): TaskMetrics = { + val rnd = new Random(System.nanoTime()) + val taskMetrics = new TaskMetrics() + taskMetrics.setExecutorDeserializeTime(rnd.nextLong()) + taskMetrics.setExecutorDeserializeCpuTime(rnd.nextLong()) + taskMetrics.setExecutorRunTime(rnd.nextLong()) + taskMetrics.setExecutorCpuTime(rnd.nextLong()) + taskMetrics.setResultSize(rnd.nextLong()) + taskMetrics.setJvmGCTime(rnd.nextLong()) + taskMetrics.setResultSerializationTime(rnd.nextLong()) + taskMetrics.incMemoryBytesSpilled(rnd.nextLong()) + taskMetrics.incDiskBytesSpilled(rnd.nextLong()) + taskMetrics.incPeakExecutionMemory(rnd.nextLong()) + taskMetrics + } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala new file mode 100644 index 000000000000..bdd931cbf3bb --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/StorageLevelSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.SparkFunSuite + + +class StorageLevelSuite extends SparkFunSuite{ + + + private def testFromDescription(oldLevel: StorageLevel): Unit = { + val desc = oldLevel.description + val newLevel = StorageLevel.fromDescription(desc) + assert(oldLevel === newLevel) + } + + test("from description") { + testFromDescription(StorageLevel.NONE) + testFromDescription(StorageLevel.DISK_ONLY) + testFromDescription(StorageLevel.DISK_ONLY_2) + testFromDescription(StorageLevel.MEMORY_ONLY) + testFromDescription(StorageLevel.MEMORY_ONLY_2) + testFromDescription(StorageLevel.MEMORY_ONLY_SER) + testFromDescription(StorageLevel.MEMORY_ONLY_SER_2) + testFromDescription(StorageLevel.MEMORY_AND_DISK) + testFromDescription(StorageLevel.MEMORY_AND_DISK_2) + testFromDescription(StorageLevel.MEMORY_AND_DISK_SER) + testFromDescription(StorageLevel.MEMORY_AND_DISK_SER_2) + testFromDescription(StorageLevel.OFF_HEAP) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 2c4a7eacdf10..8c1171f71d7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -42,8 +42,9 @@ class SQLAppStatusListener( // Live tracked data is needed by the SQL status store to calculate metrics for in-flight // executions; that means arbitrary threads may be querying these maps, so they need to be // thread-safe. - private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() - private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + // variables are visible for tests. + private[spark] val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private[spark] val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() // Returns true if this listener has no live data. Exposed for tests only. private[sql] def noLiveData(): Boolean = { @@ -67,6 +68,28 @@ class SQLAppStatusListener( } } + // visible for tests + private[spark] def recoverLiveEntities(): Unit = { + def isRunningExecution(execUI: SQLExecutionUIData): Boolean = { + execUI.completionTime.isEmpty || execUI.jobs.exists(_._2 == JobExecutionStatus.RUNNING) + } + if (!live) { + kvstore.view(classOf[SQLExecutionUIData]) + .asScala.filter(isRunningExecution) + .map(_.toLiveExecutionData) + .foreach(execData => liveExecutions.put(execData.executionId, execData)) + kvstore.view(classOf[SQLStageMetricsWrapper]) + .asScala.map(_.toLiveStageMetrics) + .foreach(metrics => stageMetrics.put(metrics.stageId, metrics)) + } + } + + // used for tests only + private[spark] def clearLiveEntities(): Unit = { + liveExecutions.clear() + stageMetrics.clear() + } + override def onJobStart(event: SparkListenerJobStart): Unit = { val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) if (executionIdString == null) { @@ -82,17 +105,7 @@ class SQLAppStatusListener( // Should not overwrite the kvstore with new entry, if it already has the SQLExecution // data corresponding to the execId. val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId) - val executionData = new LiveExecutionData(executionId) - executionData.description = sqlStoreData.description - executionData.details = sqlStoreData.details - executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription - executionData.metrics = sqlStoreData.metrics - executionData.submissionTime = sqlStoreData.submissionTime - executionData.completionTime = sqlStoreData.completionTime - executionData.jobs = sqlStoreData.jobs - executionData.stages = sqlStoreData.stages - executionData.metricsValues = sqlStoreData.metricValues - executionData.endEvents = sqlStoreData.jobs.size + 1 + val executionData = sqlStoreData.toLiveExecutionData liveExecutions.put(executionId, executionData) Some(executionData) } catch { @@ -328,7 +341,18 @@ class SQLAppStatusListener( }.toSet stageMetrics.keySet().asScala .filter(!activeStages.contains(_)) - .foreach(stageMetrics.remove) + .foreach { stageId => + stageMetrics.remove(stageId) + if (live) { + try { + kvstore.delete(classOf[SQLStageMetricsWrapper], stageId) + } catch { + case _: NoSuchElementException => + // Ignore. It's possible that LiveStageMetrics is removed + // before its first time to write to KVStore + } + } + } } private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { @@ -367,6 +391,15 @@ class SQLAppStatusListener( } } + // visible for tests + private[spark] def flush(): Unit = { + if (live) { + val now = System.nanoTime() + liveExecutions.values().asScala.foreach(_.write(kvstore, now)) + stageMetrics.values().asScala.foreach(_.write(kvstore, now)) + } + } + private def isSQLStage(stageId: Int): Boolean = { liveExecutions.values().asScala.exists { exec => exec.stages.contains(stageId) @@ -389,7 +422,7 @@ class SQLAppStatusListener( } -private class LiveExecutionData(val executionId: Long) extends LiveEntity { +private[spark] class LiveExecutionData(val executionId: Long) extends LiveEntity { var description: String = null var details: String = null @@ -424,13 +457,28 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { } -private class LiveStageMetrics( +private[spark] class LiveStageMetrics( val stageId: Int, var attemptId: Int, val accumulatorIds: Set[Long], val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics]) + extends LiveEntity { + + override protected def doUpdate() = { + new SQLStageMetricsWrapper( + stageId, + attemptId, + accumulatorIds, + taskMetrics.asScala.map { case (taskId, metrics) => + (taskId, metrics.toApi) + }.toMap + ) + } +} -private class LiveTaskMetrics( +private[spark] class LiveTaskMetrics( val ids: Array[Long], val values: Array[Long], - val succeeded: Boolean) + val succeeded: Boolean) { + def toApi: SQLTaskMetricsWrapper = new SQLTaskMetricsWrapper(ids, values, succeeded) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 241001a857c8..2ebb27e8e428 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.lang.{Long => JLong} import java.util.Date +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -99,8 +100,58 @@ class SQLExecutionUIData( @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) + + def toLiveExecutionData: LiveExecutionData = { + val liveExecutionData = new LiveExecutionData(executionId) + liveExecutionData.description = description + liveExecutionData.details = details + liveExecutionData.physicalPlanDescription = physicalPlanDescription + liveExecutionData.metrics = metrics + liveExecutionData.submissionTime = submissionTime + liveExecutionData.completionTime = completionTime + liveExecutionData.jobs = jobs + liveExecutionData.stages = stages + liveExecutionData.metricsValues = metricValues + val endNum = jobs.count { case (_, status) => + status == JobExecutionStatus.SUCCEEDED || status == JobExecutionStatus.FAILED } + liveExecutionData.endEvents = endNum + { if (completionTime.isDefined) 1 else 0 } + liveExecutionData + } } +/** + * Used to recover LiveStageMetrics in SQLAppStatusListener. It would be written into KVStore + * continuously and deleted when related LiveStageMetrics is no longer used in SQLAppStatusListener + */ +class SQLStageMetricsWrapper( + @KVIndexParam + val stageId: Int, + val attemptId: Int, + @JsonDeserialize(contentAs = classOf[Long]) + val accumulatorIds: Set[Long], + @JsonDeserialize(keyAs = classOf[Long], contentAs = classOf[LiveTaskMetrics]) + val taskMetrics: Map[Long, SQLTaskMetricsWrapper]) { + def toLiveStageMetrics: LiveStageMetrics = { + val liveTaskMetrics = new ConcurrentHashMap[Long, LiveTaskMetrics]() + val metricsMap = taskMetrics.map { case (taskId, metrics) => + (taskId, new LiveTaskMetrics(metrics.ids, metrics.values, metrics.succeeded)) + } + liveTaskMetrics.putAll(metricsMap.asJava) + new LiveStageMetrics( + stageId, + attemptId, + accumulatorIds, + liveTaskMetrics) + } +} + +class SQLTaskMetricsWrapper( + @JsonDeserialize(contentAs = classOf[Long]) + val ids: Array[Long], + @JsonDeserialize(contentAs = classOf[Long]) + val values: Array[Long], + val succeeded: Boolean) + class SparkPlanGraphWrapper( @KVIndexParam val executionId: Long, val nodes: Seq[SparkPlanGraphNodeWrapper], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 88864ccec752..80dd8d8bcd69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.json4s.jackson.JsonMethods._ @@ -609,6 +610,101 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) } + + test("recover live entities from KVStore") { + def assertSQLListenerEquals(live: SQLAppStatusListener, nonLive: SQLAppStatusListener) + : Unit = { + // ensures all live entities are wrote into KVStore + live.flush() + nonLive.clearLiveEntities() + nonLive.recoverLiveEntities() + assertSQLLiveEntityEquals(live, nonLive) + } + val conf = sparkContext.conf + kvstore = new ElementTrackingStore(new InMemoryStore, conf) + val liveListener = new SQLAppStatusListener(conf, kvstore, live = true) + val nonLiveListener = new SQLAppStatusListener(conf, kvstore, live = false) + var time = 1 + + liveListener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, "desc", "details", "planDesc", + new SparkPlanInfo("node", "test", Nil, null, Nil), time + )) + assert(liveListener.liveExecutions.size() === 1) + assertSQLListenerEquals(liveListener, nonLiveListener) + time += 1 + + val stage0 = createStageInfo(stageId = 0, attemptId = 0) + val stage1 = createStageInfo(stageId = 1, attemptId = 0) + val properties = createProperties(executionId = 0) + liveListener.onJobStart(SparkListenerJobStart( + 0, time, Seq(stage0, stage1), properties)) + assert(liveListener.stageMetrics.size() === 2) + assertSQLListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage0)) + liveListener.onStageSubmitted(SparkListenerStageSubmitted(stage1)) + liveListener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + "execId-0", Seq((0, 0, 0, createAccumulatorInfos(Map((0, 0), (1, 1))))))) + liveListener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + "execId-0", Seq((0, 1, 1, createAccumulatorInfos(Map((0, 0), (1, 1))))))) + assertSQLListenerEquals(liveListener, nonLiveListener) + + liveListener.onJobEnd(SparkListenerJobEnd(0, time, JobSucceeded)) + assertSQLListenerEquals(liveListener, nonLiveListener) + time += 1 + + liveListener.onOtherEvent(SparkListenerSQLExecutionEnd(0, time)) + assert(liveListener.liveExecutions.size() === 0) + assertSQLListenerEquals(liveListener, nonLiveListener) + + // SQLStageMetricsWrapper should be clean up after SQL executions end + assert(kvstore.count(classOf[SQLStageMetricsWrapper]) === 0) + } + + private def assertSQLLiveEntityEquals(src: SQLAppStatusListener, dest: SQLAppStatusListener) + : Unit = { + val srcExecutions = src.liveExecutions + val destExecutions = dest.liveExecutions + assert(srcExecutions.size() === destExecutions.size()) + srcExecutions.keys().asScala.foreach { id => + val srcExec = srcExecutions.get(id) + val destExec = destExecutions.get(id) + assert(srcExec.executionId === destExec.executionId) + assert(srcExec.description === destExec.description) + assert(srcExec.details === destExec.details) + assert(srcExec.physicalPlanDescription === destExec.physicalPlanDescription) + assert(srcExec.metrics === destExec.metrics) + assert(srcExec.submissionTime === destExec.submissionTime) + assert(srcExec.completionTime === destExec.completionTime) + assert(srcExec.jobs === destExec.jobs) + assert(srcExec.stages === destExec.stages) + assert(srcExec.driverAccumUpdates === destExec.driverAccumUpdates) + assert(srcExec.metricsValues === destExec.metricsValues) + assert(srcExec.endEvents === destExec.endEvents) + } + val srcStageMetrics = src.stageMetrics + val destStageMetrics = dest.stageMetrics + assert(srcStageMetrics.size() === destStageMetrics.size()) + srcStageMetrics.keys().asScala.foreach { id => + val srcStageM = srcStageMetrics.get(id) + val destStageM = destStageMetrics.get(id) + assert(srcStageM.stageId === destStageM.stageId) + assert(srcStageM.attemptId === destStageM.attemptId) + assert(srcStageM.accumulatorIds === destStageM.accumulatorIds) + val srcTaskMetrics = srcStageM.taskMetrics + val destTaskMetrics = destStageM.taskMetrics + assert(srcTaskMetrics.size() === destTaskMetrics.size()) + srcTaskMetrics.keys().asScala.foreach { id => + val srcTaskM = srcTaskMetrics.get(id) + val destTaskM = destTaskMetrics.get(id) + assert(srcTaskM.ids === destTaskM.ids) + assert(srcTaskM.values === destTaskM.values) + assert(srcTaskM.succeeded === destTaskM.succeeded) + } + } + } }