From 86d109d508504efa2f480b2fa16e9a7fe99034be Mon Sep 17 00:00:00 2001 From: Shahid Date: Wed, 17 Apr 2019 13:35:50 +0800 Subject: [PATCH 1/4] [SPARK-27468]Storage Level" in "RDD Storage Page" is not correct --- .../spark/status/AppStatusListener.scala | 18 ++++++++- .../org/apache/spark/status/LiveEntity.scala | 3 +- .../spark/status/AppStatusListenerSuite.scala | 38 +++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 71e0390f39a4..80f014826f5e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -917,8 +917,24 @@ private[spark] class AppStatusListener( // Update the block entry in the RDD info, keeping track of the deltas above so that we // can update the executor information too. liveRDDs.get(block.rddId).foreach { rdd => + if (updatedStorageLevel.isDefined) { - rdd.setStorageLevel(updatedStorageLevel.get) + // Replicated block update events will have `storageLevel.replication=1`. + // To avoid overwriting the block replicated event in the store, we need to + // have a check for whether the event is block replication or not. + // Default value of `storageInfo.replication = 1` and hence if + // `storeLevel.replication = 2`, the replicated events won't overwrite in the store. + val storageInfo = rdd.storageInfo + val isReplicatedBlockUpdateEvent = storageLevel.replication < storageInfo.replication && + (storageInfo.useDisk == storageLevel.useDisk && + storageInfo.useMemory == storageLevel.useMemory && + storageInfo.deserialized == storageLevel.deserialized && + storageInfo.useOffHeap == storageLevel.useOffHeap) + + if (!isReplicatedBlockUpdateEvent) { + rdd.storageInfo = storageLevel + rdd.setStorageLevel(updatedStorageLevel.get) + } } val partition = rdd.partition(block.name) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 6d7b34ae979f..f0593da4c787 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -29,7 +29,7 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} import org.apache.spark.status.api.v1 -import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.SparkUI import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet @@ -510,6 +510,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { var storageLevel: String = weakIntern(info.storageLevel.description) var memoryUsed = 0L var diskUsed = 0L + var storageInfo: StorageLevel = new StorageLevel() private val partitions = new HashMap[String, LiveRDDPartition]() private val partitionSeq = new RDDPartitionSeq() diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index b5800661efa7..736af5feec94 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1520,6 +1520,44 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("storage description should display correct replication if storage replication is 2") { + val listener = new AppStatusListener(store, conf, true) + // Register a couple of block managers. + val bm1 = BlockManagerId("1", "host-1", 1234) + val bm2 = BlockManagerId("2", "host-2", 2345) + + Seq(bm1, bm2).foreach { bm => + listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, + new ExecutorInfo(bm.host, 1, Map.empty, Map.empty))) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, 42L)) + } + val rddBlock = RddBlock(1, 1, 1L, 2L) + + val level = StorageLevel.MEMORY_AND_DISK_2 + // `replication` value of the replicated block will be 1. + val levelBlockReplica = StorageLevel.MEMORY_AND_DISK + + // Submit a stage and make sure the RDDs are recorded. + val rdd1Info = new RDDInfo(rddBlock.rddId, "rdd1", 2, level, Nil) + val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + // Block update event, where replication = 2 + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rddBlock.blockId, level, rddBlock.memSize, rddBlock.diskSize))) + // Block update event, where replication = 1 + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm2, rddBlock.blockId, levelBlockReplica, rddBlock.memSize, rddBlock.diskSize))) + + check[RDDStorageInfoWrapper](rddBlock.rddId) { wrapper => + val partitionInfo = wrapper.info.partitions.get.find(_.blockName === rddBlock.blockId.name).get + assert(partitionInfo.storageLevel === level.description) + assert(partitionInfo.memoryUsed === 2 * rddBlock.memSize) + assert(partitionInfo.diskUsed === 2 * rddBlock.diskSize) + assert(partitionInfo.executors === Seq(bm1.executorId, bm2.executorId)) + } + } + test("storage information on executor lost/down") { val listener = new AppStatusListener(store, conf, true) val maxMemory = 42L From 8d3c32e4cd4c71db866418a0296a4a8a3679fc2f Mon Sep 17 00:00:00 2001 From: Shahid Date: Thu, 18 Apr 2019 15:01:28 +0800 Subject: [PATCH 2/4] scalastyle --- .../org/apache/spark/status/AppStatusListenerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 736af5feec94..986be9771bbe 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1547,10 +1547,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { BlockUpdatedInfo(bm1, rddBlock.blockId, level, rddBlock.memSize, rddBlock.diskSize))) // Block update event, where replication = 1 listener.onBlockUpdated(SparkListenerBlockUpdated( - BlockUpdatedInfo(bm2, rddBlock.blockId, levelBlockReplica, rddBlock.memSize, rddBlock.diskSize))) + BlockUpdatedInfo(bm2, rddBlock.blockId, levelBlockReplica, rddBlock.memSize, + rddBlock.diskSize))) check[RDDStorageInfoWrapper](rddBlock.rddId) { wrapper => - val partitionInfo = wrapper.info.partitions.get.find(_.blockName === rddBlock.blockId.name).get + val partitionInfo = wrapper.info.partitions.get.find(_.blockName === rddBlock.blockId.name) + .get assert(partitionInfo.storageLevel === level.description) assert(partitionInfo.memoryUsed === 2 * rddBlock.memSize) assert(partitionInfo.diskUsed === 2 * rddBlock.diskSize) From fbcc0c72ee97ca2862632f3c9e2b481d95520152 Mon Sep 17 00:00:00 2001 From: Shahid Date: Thu, 18 Apr 2019 23:35:21 +0800 Subject: [PATCH 3/4] adress comment --- core/src/main/scala/org/apache/spark/status/LiveEntity.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index f0593da4c787..e9bf561af145 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -510,7 +510,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { var storageLevel: String = weakIntern(info.storageLevel.description) var memoryUsed = 0L var diskUsed = 0L - var storageInfo: StorageLevel = new StorageLevel() + var storageInfo: StorageLevel = info.storageLevel private val partitions = new HashMap[String, LiveRDDPartition]() private val partitionSeq = new RDDPartitionSeq() From a22cd68cfd3aa2e4c249b34c32261b90d88ecf68 Mon Sep 17 00:00:00 2001 From: Shahid Date: Tue, 7 May 2019 12:18:21 +0530 Subject: [PATCH 4/4] address comments --- .../spark/status/AppStatusListener.scala | 19 ------------------- .../org/apache/spark/status/LiveEntity.scala | 8 +++----- 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 80f014826f5e..eee9d8455f17 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -918,25 +918,6 @@ private[spark] class AppStatusListener( // can update the executor information too. liveRDDs.get(block.rddId).foreach { rdd => - if (updatedStorageLevel.isDefined) { - // Replicated block update events will have `storageLevel.replication=1`. - // To avoid overwriting the block replicated event in the store, we need to - // have a check for whether the event is block replication or not. - // Default value of `storageInfo.replication = 1` and hence if - // `storeLevel.replication = 2`, the replicated events won't overwrite in the store. - val storageInfo = rdd.storageInfo - val isReplicatedBlockUpdateEvent = storageLevel.replication < storageInfo.replication && - (storageInfo.useDisk == storageLevel.useDisk && - storageInfo.useMemory == storageLevel.useMemory && - storageInfo.deserialized == storageLevel.deserialized && - storageInfo.useOffHeap == storageLevel.useOffHeap) - - if (!isReplicatedBlockUpdateEvent) { - rdd.storageInfo = storageLevel - rdd.setStorageLevel(updatedStorageLevel.get) - } - } - val partition = rdd.partition(block.name) val executors = if (updatedStorageLevel.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index e9bf561af145..24f364ae013e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -29,7 +29,7 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} import org.apache.spark.status.api.v1 -import org.apache.spark.storage.{RDDInfo, StorageLevel} +import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.SparkUI import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet @@ -507,18 +507,16 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { import LiveEntityHelpers._ - var storageLevel: String = weakIntern(info.storageLevel.description) var memoryUsed = 0L var diskUsed = 0L - var storageInfo: StorageLevel = info.storageLevel private val partitions = new HashMap[String, LiveRDDPartition]() private val partitionSeq = new RDDPartitionSeq() private val distributions = new HashMap[String, LiveRDDDistribution]() - def setStorageLevel(level: String): Unit = { - this.storageLevel = weakIntern(level) + def storageLevel: String = { + weakIntern(info.storageLevel.description) } def partition(blockName: String): LiveRDDPartition = {