Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,6 @@ class StorageStatusListener extends SparkListener {
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
if (updatedBlocks.length > 0) {
updateStorageStatus(info.executorId, updatedBlocks)
}
}
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
Expand All @@ -91,4 +80,15 @@ class StorageStatusListener extends SparkListener {
}
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
val blockId = blockUpdated.blockUpdatedInfo.blockId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val externalSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
val blockStatus = BlockStatus(storageLevel, memSize, diskSize, externalSize)
updateStorageStatus(executorId, Seq((blockId, blockStatus)))
}

}
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
}

/**
* Assumes the storage status list is fully up-to-date. This implies the corresponding
* StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val metrics = taskEnd.taskMetrics
if (metrics != null && metrics.updatedBlocks.isDefined) {
updateRDDInfo(metrics.updatedBlocks.get)
}
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
Expand All @@ -84,4 +73,15 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
_rddInfoMap.remove(unpersistRDD.rddId)
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
super.onBlockUpdated(blockUpdated)
val blockId = blockUpdated.blockUpdatedInfo.blockId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val externalSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
val blockStatus = BlockStatus(storageLevel, memSize, diskSize, externalSize)
updateRDDInfo(Seq((blockId, blockStatus)))
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[ {
"id" : "<driver>",
"hostPort" : "localhost:57971",
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"activeTasks" : 0,
"failedTasks" : 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1 @@
[ {
"id" : 0,
"name" : "0",
"numPartitions" : 8,
"numCachedPartitions" : 8,
"storageLevel" : "Memory Deserialized 1x Replicated",
"memoryUsed" : 28000128,
"diskUsed" : 0
} ]
[ ]
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"stage task list from multi-attempt app json(2)" ->
"applications/local-1426533911241/2/stages/0/0/taskList",

"rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
"one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
"rdd list storage json" -> "applications/local-1422981780767/storage/rdd"
// Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
// "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
)

// run a bunch of characterization tests -- just verify the behavior is the same as what is saved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,48 +76,51 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}

test("task end with updated blocks") {
test("updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))

// Task end with new blocks

val blockUpdateInfos1 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L, 0L)
)
val blockUpdateInfos2 =
Seq(BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L, 0L))

// Add some new blocks
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
postUpdateBlock(listener, blockUpdateInfos1)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
postUpdateBlock(listener, blockUpdateInfos2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))

// Task end with dropped blocks
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
// Dropped the blocks
val droppedBlockInfo1 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.NONE, 0L, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L, 0L)
)
val droppedBlockInfo2 = Seq(
BlockUpdatedInfo(bm2, RDDBlockId(1, 2), StorageLevel.NONE, 0L, 0L, 0L),
BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L, 0L)
)

listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
postUpdateBlock(listener, droppedBlockInfo1)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
postUpdateBlock(listener, droppedBlockInfo2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
Expand All @@ -128,15 +131,14 @@ class StorageStatusListenerSuite extends SparkFunSuite {
test("unpersist RDD") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
val blockUpdateInfos1 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L, 0L)
)
val blockUpdateInfos2 =
Seq(BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L, 0L))
postUpdateBlock(listener, blockUpdateInfos1)
postUpdateBlock(listener, blockUpdateInfos2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)

// Unpersist RDD
Expand All @@ -149,4 +151,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}

private def postUpdateBlock(
listener: StorageStatusListener, updateBlockInfos: Seq[BlockUpdatedInfo]): Unit = {
updateBlockInfos.foreach { updateBlockInfo =>
listener.onBlockUpdated(SparkListenerBlockUpdated(updateBlockInfo))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.size === 0)
}

test("task end") {
test("block update") {
val myRddInfo0 = rddInfo0
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
Expand All @@ -119,20 +119,14 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(1).isCached)
assert(!storageListener._rddInfoMap(2).isCached)

// Task end with no updated blocks. This should not change anything.
bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0)

// Task end with a few new persisted blocks, some from the same RDD
val metrics1 = new TaskMetrics
metrics1.updatedBlocks = Some(Seq(
(RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)),
(RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)),
(RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
(RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
))
bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
// Some blocks updated
val blockUpdateInfos = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(0, 100), memAndDisk, 400L, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(0, 101), memAndDisk, 0L, 400L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(0, 102), memAndDisk, 400L, 0L, 200L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 20), memAndDisk, 0L, 240L, 0L)
)
postUpdateBlocks(bus, blockUpdateInfos)
assert(storageListener._rddInfoMap(0).memSize === 800L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
Expand All @@ -146,15 +140,14 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(2).isCached)
assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)

// Task end with a few dropped blocks
val metrics2 = new TaskMetrics
metrics2.updatedBlocks = Some(Seq(
(RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)),
(RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)),
(RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
(RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
))
bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
// Drop some blocks
val blockUpdateInfos2 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(0, 100), none, 0L, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 20), none, 0L, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(2, 40), none, 0L, 0L, 0L), // doesn't actually exist
BlockUpdatedInfo(bm1, RDDBlockId(4, 80), none, 0L, 0L, 0L) // doesn't actually exist
)
postUpdateBlocks(bus, blockUpdateInfos2)
assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
Expand All @@ -172,24 +165,27 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4))
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
val taskMetrics0 = new TaskMetrics
val taskMetrics1 = new TaskMetrics
val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
taskMetrics0.updatedBlocks = Some(Seq(block0))
taskMetrics1.updatedBlocks = Some(Seq(block1))
val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L, 0L))
val blockUpdateInfos2 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(1, 1), memOnly, 200L, 0L, 0L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
postUpdateBlocks(bus, blockUpdateInfos1)
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerStageCompleted(stageInfo0))
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerTaskEnd(1, 0, "small", Success, taskInfo1, taskMetrics1))
postUpdateBlocks(bus, blockUpdateInfos2)
assert(storageListener.rddInfoList.size === 2)
bus.postToAll(SparkListenerStageCompleted(stageInfo1))
assert(storageListener.rddInfoList.size === 2)
}

private def postUpdateBlocks(
bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
blockUpdateInfos.foreach { blockUpdateInfo =>
bus.postToAll(SparkListenerBlockUpdated(blockUpdateInfo))
}
}
}