diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5ea161cd0d15..411196438539 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -646,8 +646,17 @@ private[spark] class AppStatusListener( } override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { - liveRDDs.remove(event.rddId) - kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + while (true) { + liveRDDs.get(event.rddId) match { + case Some(rdd) => + if (rdd.isEmpty()) { + liveRDDs.remove(event.rddId) + kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + } + case None => + return + } + } } override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { @@ -710,9 +719,21 @@ private[spark] class AppStatusListener( val executorId = event.blockUpdatedInfo.blockManagerId.executorId // Whether values are being added to or removed from the existing accounting. + // BlockManager always send empty block status message when user try to remove rdd block, + // so we try to get this removed block size from rdd partition to get accurate memory/disk storage size. val storageLevel = event.blockUpdatedInfo.storageLevel - val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) - val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + val diskDelta: Long = storageLevel != StorageLevel.NONE match { + case true => event.blockUpdatedInfo.diskSize + case false => liveRDDs.get(block.rddId).map { rdd => + rdd.partition(block.name).diskUsed * (-1) + }.get + } + val memoryDelta = storageLevel != StorageLevel.NONE match { + case true => event.blockUpdatedInfo.memSize + case false => liveRDDs.get(block.rddId).map{ rdd => + rdd.partition(block.name).memoryUsed * (-1) + }.get + } // Function to apply a delta to a value, but ensure that it doesn't go negative. def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) @@ -810,7 +831,7 @@ private[spark] class AppStatusListener( // Finish updating the executor now that we know the delta in the number of blocks. maybeExec.foreach { exec => exec.rddBlocks += rddBlocksDelta - maybeUpdate(exec, now) + update(exec, now) } } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 79e3f13b826c..062983a7047c 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -20,11 +20,7 @@ package org.apache.spark.status import java.util.Date import java.util.concurrent.atomic.AtomicInteger -import scala.collection.immutable.{HashSet, TreeSet} -import scala.collection.mutable.HashMap - import com.google.common.collect.Interners - import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} @@ -33,7 +29,9 @@ import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.SparkUI import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet -import org.apache.spark.util.kvstore.KVStore + +import scala.collection.immutable.{HashSet, TreeSet} +import scala.collection.mutable.HashMap /** * A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live @@ -120,7 +118,7 @@ private class LiveTask( stageAttemptId: Int, lastUpdateTime: Option[Long]) extends LiveEntity { - import LiveEntityHelpers._ + import org.apache.spark.status.LiveEntityHelpers._ // The task metrics use a special value when no metrics have been reported. The special value is // checked when calculating indexed values when writing to the store (see [[TaskDataWrapper]]). @@ -313,7 +311,7 @@ private class LiveExecutorStageSummary( attemptId: Int, executorId: String) extends LiveEntity { - import LiveEntityHelpers._ + import org.apache.spark.status.LiveEntityHelpers._ var taskTime = 0L var succeededTasks = 0 @@ -347,7 +345,7 @@ private class LiveExecutorStageSummary( private class LiveStage extends LiveEntity { - import LiveEntityHelpers._ + import org.apache.spark.status.LiveEntityHelpers._ var jobs = Seq[LiveJob]() var jobIds = Set[Int]() @@ -436,7 +434,7 @@ private class LiveStage extends LiveEntity { private class LiveRDDPartition(val blockName: String) { - import LiveEntityHelpers._ + import org.apache.spark.status.LiveEntityHelpers._ // Pointers used by RDDPartitionSeq. @volatile var prev: LiveRDDPartition = null @@ -467,7 +465,7 @@ private class LiveRDDPartition(val blockName: String) { private class LiveRDDDistribution(exec: LiveExecutor) { - import LiveEntityHelpers._ + import org.apache.spark.status.LiveEntityHelpers._ val executorId = exec.executorId var memoryUsed = 0L @@ -498,7 +496,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { private class LiveRDD(val info: RDDInfo) extends LiveEntity { - import LiveEntityHelpers._ + import org.apache.spark.status.LiveEntityHelpers._ var storageLevel: String = weakIntern(info.storageLevel.description) var memoryUsed = 0L @@ -509,6 +507,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { private val distributions = new HashMap[String, LiveRDDDistribution]() + def isEmpty(): Boolean = { + memoryUsed == 0L && diskUsed == 0L && partitions.isEmpty && distributions.isEmpty + } + def setStorageLevel(level: String): Unit = { this.storageLevel = weakIntern(level) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e7cdfab99b34..504d5217b002 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1549,7 +1549,7 @@ private[spark] class BlockManager( // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) - blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } + blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = true) } blocksToRemove.size }