From dd5f766e0f270cfc58ca4298c39179469f021f78 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Aug 2018 23:17:46 +0000 Subject: [PATCH 1/2] Update memory and disk info when unpersist rdds. --- .../spark/status/AppStatusListener.scala | 43 ++++++++++++++++++- .../org/apache/spark/status/LiveEntity.scala | 8 ++++ .../org/apache/spark/storage/RDDInfo.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 29 +++++++++++++ 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5ea161cd0d15..2ae10a4d5d20 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -23,6 +23,7 @@ import java.util.function.Function import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -646,7 +647,47 @@ private[spark] class AppStatusListener( } override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { - liveRDDs.remove(event.rddId) + liveRDDs.remove(event.rddId).foreach { liveRDD => + val executorsToUpdate = new HashSet[LiveExecutor]() + val storageLevel = liveRDD.info.storageLevel + val distributions = liveRDD.getDistributions() + + // Use RDD distribution to update executor memory and disk usage info. + distributions.foreach { case (executorId, rddDist) => + val maybeExec = liveExecutors.get(executorId) + + maybeExec.foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = math.max(0, exec.usedOffHeap - rddDist.offHeapUsed) + } else { + exec.usedOnHeap = math.max(0, exec.usedOnHeap - rddDist.onHeapUsed) + } + } + exec.memoryUsed = math.max(0, exec.memoryUsed - rddDist.memoryUsed) + exec.diskUsed = math.max(0, exec.diskUsed - rddDist.diskUsed) + executorsToUpdate += exec + } + } + + // Use RDD partition info to update executor block info. + val partitions = liveRDD.getPartitions() + + partitions.foreach { case (_, part) => + val executors = part.executors + executors.foreach { executorId => + val maybeExec = liveExecutors.get(executorId) + + maybeExec.foreach { exec => + exec.rddBlocks = exec.rddBlocks - 1 + executorsToUpdate += exec + } + } + } + val now = System.nanoTime() + executorsToUpdate.foreach(maybeUpdate(_, now)) + } + kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 79e3f13b826c..6f463604e174 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -538,6 +538,14 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { distributions.get(exec.executorId) } + def getPartitions(): Map[String, LiveRDDPartition] = { + partitions.toMap + } + + def getDistributions(): Map[String, LiveRDDDistribution] = { + distributions.toMap + } + override protected def doUpdate(): Any = { val dists = if (distributions.nonEmpty) { Some(distributions.values.map(_.toApi()).toSeq) diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 9ccc8f9cc585..64e5c8b1c4bb 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -55,7 +55,7 @@ class RDDInfo( } private[spark] object RDDInfo { - private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) + private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 1b3639ad64a7..698fc60523ee 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -881,12 +881,41 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(dist.memoryRemaining === maxMemory - rdd2b1.memSize - rdd1b2.memSize ) } + // Add block1 of rdd1 back to bm 1. + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize))) + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 3L) + assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize + rdd2b1.memSize) + assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize + rdd2b1.diskSize) + } + // Unpersist RDD1. listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId)) intercept[NoSuchElementException] { check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () } } + // executor1 now only contains block1 from rdd2. + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === rdd2b1.memSize) + assert(exec.info.diskUsed === rdd2b1.diskSize) + } + + // Unpersist RDD2. + listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd2b1.rddId)) + intercept[NoSuchElementException] { + check[RDDStorageInfoWrapper](rdd2b1.rddId) { _ => () } + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 0L) + assert(exec.info.memoryUsed === 0) + assert(exec.info.diskUsed === 0) + } + // Update a StreamBlock. val stream1 = StreamBlockId(1, 1L) listener.onBlockUpdated(SparkListenerBlockUpdated( From 7c76790c9fab6d695aa45f7efbba766660c0e2a8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 11 Sep 2018 07:45:10 +0000 Subject: [PATCH 2/2] Address comments. --- .../spark/status/AppStatusListener.scala | 79 ++++++++----------- .../org/apache/spark/status/LiveEntity.scala | 8 +- 2 files changed, 37 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 2ae10a4d5d20..2e5268659647 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -23,7 +23,6 @@ import java.util.function.Function import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -648,44 +647,34 @@ private[spark] class AppStatusListener( override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { liveRDDs.remove(event.rddId).foreach { liveRDD => - val executorsToUpdate = new HashSet[LiveExecutor]() val storageLevel = liveRDD.info.storageLevel - val distributions = liveRDD.getDistributions() - // Use RDD distribution to update executor memory and disk usage info. - distributions.foreach { case (executorId, rddDist) => - val maybeExec = liveExecutors.get(executorId) - - maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { - if (storageLevel.useOffHeap) { - exec.usedOffHeap = math.max(0, exec.usedOffHeap - rddDist.offHeapUsed) - } else { - exec.usedOnHeap = math.max(0, exec.usedOnHeap - rddDist.onHeapUsed) - } + // Use RDD partition info to update executor block info. + liveRDD.getPartitions().foreach { case (_, part) => + part.executors.foreach { executorId => + liveExecutors.get(executorId).foreach { exec => + exec.rddBlocks = exec.rddBlocks - 1 } - exec.memoryUsed = math.max(0, exec.memoryUsed - rddDist.memoryUsed) - exec.diskUsed = math.max(0, exec.diskUsed - rddDist.diskUsed) - executorsToUpdate += exec } } - // Use RDD partition info to update executor block info. - val partitions = liveRDD.getPartitions() - - partitions.foreach { case (_, part) => - val executors = part.executors - executors.foreach { executorId => - val maybeExec = liveExecutors.get(executorId) + val now = System.nanoTime() - maybeExec.foreach { exec => - exec.rddBlocks = exec.rddBlocks - 1 - executorsToUpdate += exec + // Use RDD distribution to update executor memory and disk usage info. + liveRDD.getDistributions().foreach { case (executorId, rddDist) => + liveExecutors.get(executorId).foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed) + } else { + exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed) + } } + exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed) + exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed) + maybeUpdate(exec, now) } } - val now = System.nanoTime() - executorsToUpdate.foreach(maybeUpdate(_, now)) } kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) @@ -746,6 +735,11 @@ private[spark] class AppStatusListener( .sortBy(_.stageId) } + /** + * Apply a delta to a value, but ensure that it doesn't go negative. + */ + private def addDeltaToValue(old: Long, delta: Long): Long = math.max(0, old + delta) + private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { val now = System.nanoTime() val executorId = event.blockUpdatedInfo.blockManagerId.executorId @@ -755,9 +749,6 @@ private[spark] class AppStatusListener( val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) - // Function to apply a delta to a value, but ensure that it doesn't go negative. - def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) - val updatedStorageLevel = if (storageLevel.isValid) { Some(storageLevel.description) } else { @@ -774,13 +765,13 @@ private[spark] class AppStatusListener( maybeExec.foreach { exec => if (exec.hasMemoryInfo) { if (storageLevel.useOffHeap) { - exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta) + exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) } else { - exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta) + exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) } } - exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = newValue(exec.diskUsed, diskDelta) + exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) + exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) } // Update the block entry in the RDD info, keeping track of the deltas above so that we @@ -808,8 +799,8 @@ private[spark] class AppStatusListener( // Only update the partition if it's still stored in some executor, otherwise get rid of it. if (executors.nonEmpty) { partition.update(executors, rdd.storageLevel, - newValue(partition.memoryUsed, memoryDelta), - newValue(partition.diskUsed, diskDelta)) + addDeltaToValue(partition.memoryUsed, memoryDelta), + addDeltaToValue(partition.diskUsed, diskDelta)) } else { rdd.removePartition(block.name) } @@ -817,14 +808,14 @@ private[spark] class AppStatusListener( maybeExec.foreach { exec => if (exec.rddBlocks + rddBlocksDelta > 0) { val dist = rdd.distribution(exec) - dist.memoryUsed = newValue(dist.memoryUsed, memoryDelta) - dist.diskUsed = newValue(dist.diskUsed, diskDelta) + dist.memoryUsed = addDeltaToValue(dist.memoryUsed, memoryDelta) + dist.diskUsed = addDeltaToValue(dist.diskUsed, diskDelta) if (exec.hasMemoryInfo) { if (storageLevel.useOffHeap) { - dist.offHeapUsed = newValue(dist.offHeapUsed, memoryDelta) + dist.offHeapUsed = addDeltaToValue(dist.offHeapUsed, memoryDelta) } else { - dist.onHeapUsed = newValue(dist.onHeapUsed, memoryDelta) + dist.onHeapUsed = addDeltaToValue(dist.onHeapUsed, memoryDelta) } } dist.lastUpdate = null @@ -843,8 +834,8 @@ private[spark] class AppStatusListener( } } - rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta) - rdd.diskUsed = newValue(rdd.diskUsed, diskDelta) + rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, memoryDelta) + rdd.diskUsed = addDeltaToValue(rdd.diskUsed, diskDelta) update(rdd, now) } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 6f463604e174..ce737ac0e769 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -538,13 +538,9 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { distributions.get(exec.executorId) } - def getPartitions(): Map[String, LiveRDDPartition] = { - partitions.toMap - } + def getPartitions(): scala.collection.Map[String, LiveRDDPartition] = partitions - def getDistributions(): Map[String, LiveRDDDistribution] = { - distributions.toMap - } + def getDistributions(): scala.collection.Map[String, LiveRDDDistribution] = distributions override protected def doUpdate(): Any = { val dists = if (distributions.nonEmpty) {