Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ private[spark] class AppStatusListener(
(partition.memoryUsed / partition.executors.length) * -1)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
(partition.diskUsed / partition.executors.length) * -1)
partition.update(partition.executors
.filter(!_.equals(event.executorId)), rdd.storageLevel,
partition.update(
partition.executors.filter(!_.equals(event.executorId)),
addDeltaToValue(partition.memoryUsed,
(partition.memoryUsed / partition.executors.length) * -1),
addDeltaToValue(partition.diskUsed,
Expand Down Expand Up @@ -495,7 +495,7 @@ private[spark] class AppStatusListener(

event.stageInfo.rddInfos.foreach { info =>
if (info.storageLevel.isValid) {
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now)
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info, info.storageLevel)), now)
}
}

Expand Down Expand Up @@ -916,12 +916,6 @@ private[spark] class AppStatusListener(
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)

val updatedStorageLevel = if (storageLevel.isValid) {
Some(storageLevel.description)
} else {
None
}

// We need information about the executor to update some memory accounting values in the
// RDD info, so read that beforehand.
val maybeExec = liveExecutors.get(executorId)
Expand All @@ -936,13 +930,9 @@ private[spark] class AppStatusListener(
// Update the block entry in the RDD info, keeping track of the deltas above so that we
// can update the executor information too.
liveRDDs.get(block.rddId).foreach { rdd =>
if (updatedStorageLevel.isDefined) {
rdd.setStorageLevel(updatedStorageLevel.get)
}

val partition = rdd.partition(block.name)

val executors = if (updatedStorageLevel.isDefined) {
val executors = if (storageLevel.isValid) {
val current = partition.executors
if (current.contains(executorId)) {
current
Expand All @@ -957,7 +947,7 @@ private[spark] class AppStatusListener(

// Only update the partition if it's still stored in some executor, otherwise get rid of it.
if (executors.nonEmpty) {
partition.update(executors, rdd.storageLevel,
partition.update(executors,
addDeltaToValue(partition.memoryUsed, memoryDelta),
addDeltaToValue(partition.diskUsed, diskDelta))
} else {
Expand Down
37 changes: 24 additions & 13 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.{RDDInfo, StorageLevel}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
Expand Down Expand Up @@ -458,7 +458,13 @@ private class LiveStage extends LiveEntity {

}

private class LiveRDDPartition(val blockName: String) {
/**
* Data about a single partition of a cached RDD. The RDD storage level is used to compute the
* effective storage level of the partition, which takes into account the storage actually being
* used by the partition in the executors, and thus may differ from the storage level requested
* by the application.
*/
private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help me a bit if this was called 'requestedStorageLevel' and in RDDPartitionInfo it was called 'effectiveStorageLevel'. I guess its not worth changing RDDPartitionInfo, but maybe just a comment along those lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


import LiveEntityHelpers._

Expand All @@ -476,12 +482,13 @@ private class LiveRDDPartition(val blockName: String) {

def update(
executors: Seq[String],
storageLevel: String,
memoryUsed: Long,
diskUsed: Long): Unit = {
val level = StorageLevel(diskUsed > 0, memoryUsed > 0, rddLevel.useOffHeap,
if (memoryUsed > 0) rddLevel.deserialized else false, executors.size)
value = new v1.RDDPartitionInfo(
blockName,
weakIntern(storageLevel),
weakIntern(level.description),
memoryUsed,
diskUsed,
executors)
Expand Down Expand Up @@ -520,27 +527,31 @@ private class LiveRDDDistribution(exec: LiveExecutor) {

}

private class LiveRDD(val info: RDDInfo) extends LiveEntity {
/**
* Tracker for data related to a persisted RDD.
*
* The RDD storage level is immutable, following the current behavior of `RDD.persist()`, even
* though it is mutable in the `RDDInfo` structure. Since the listener does not track unpersisted
* RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage
* it started after the RDD is marked for caching.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stage may have been submitted before the RDD was persisted at all, then another stage submitted after the RDD is persisted, so its not actually immutable. You wouldn't properly capture that here.

(probably not the optimal thing for the user to do, but I've seen weirder things ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that works fine, because the listener does not track RDDs that are not persisted; so there wouldn't be a live RDD for the first stage in your example; it would be created when the second stage is submitted, and at that point the storage level cannot be changed further.

I'll update the comment (maybe even add a unit test).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks for adding the test too.

private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity {

import LiveEntityHelpers._

var storageLevel: String = weakIntern(info.storageLevel.description)
var memoryUsed = 0L
var diskUsed = 0L

private val levelDescription = weakIntern(storageLevel.description)
private val partitions = new HashMap[String, LiveRDDPartition]()
private val partitionSeq = new RDDPartitionSeq()

private val distributions = new HashMap[String, LiveRDDDistribution]()

def setStorageLevel(level: String): Unit = {
this.storageLevel = weakIntern(level)
}

def partition(blockName: String): LiveRDDPartition = {
partitions.getOrElseUpdate(blockName, {
val part = new LiveRDDPartition(blockName)
part.update(Nil, storageLevel, 0L, 0L)
val part = new LiveRDDPartition(blockName, storageLevel)
part.update(Nil, 0L, 0L)
partitionSeq.addPartition(part)
part
})
Expand Down Expand Up @@ -578,7 +589,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
info.name,
info.numPartitions,
partitions.size,
storageLevel,
levelDescription,
memoryUsed,
diskUsed,
dists,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)

private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2)

private var time: Long = _
private var testDir: File = _
private var store: ElementTrackingStore = _
Expand Down Expand Up @@ -697,8 +699,16 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val rdd2b1 = RddBlock(2, 1, 5L, 6L)
val level = StorageLevel.MEMORY_AND_DISK

// Submit a stage for the first RDD before it's marked for caching, to make sure later
// the listener picks up the correct storage level.
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, StorageLevel.NONE, false, Nil)
val stage0 = new StageInfo(0, 0, "stage0", 4, Seq(rdd1Info), Nil, "details0")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage0, new Properties()))
listener.onStageCompleted(SparkListenerStageCompleted(stage0))
assert(store.count(classOf[RDDStorageInfoWrapper]) === 0)

// Submit a stage and make sure the RDDs are recorded.
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil)
rdd1Info.storageLevel = level
val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil)
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
Expand Down Expand Up @@ -763,6 +773,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(part.memoryUsed === rdd1b1.memSize * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to your change, but for the line just above (wrapper.info.partitions.get(0)), would it be better to have wrapper.info.partitions.get.find(_.blockName == rddb1.blockId.name) ? Or at least an assert after that, that you have the right block? otherwise will lead to confusing errors later if somebody changes the test and the rdd1b1 isn't added first.

Also unrelated to your change, but coming back to this code after a long time I was a little surprised that info.partitions isn't ordered by the partition, that may also be worth a comment somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add an assert.

The list is not ordered for performance reasons. See RDDPartitionSeq in LiveEntyty.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(actually the assert / change is not needed since a few lines above there's an assertion that the list has a single element.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fair enough.

I did realize why RDDPartitionSeq is the way it is after a closer look, I was just mentioning that its not so obvious, or commented in the places I expected (the api just exposes a Seq[RDDPartitionInfo])

assert(part.diskUsed === rdd1b1.diskSize * 2)
assert(part.executors === Seq(bm1.executorId, bm2.executorId))
assert(part.storageLevel === twoReplicaMemAndDiskLevel.description)
}

check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
Expand Down Expand Up @@ -800,9 +811,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
}

// Remove block 1 from bm 1.
// Evict block 1 from memory in bm 1. Note that because of SPARK-29319, the disk size
// is reported as "0" here to avoid double-counting; the current behavior of the block
// manager is to provide the actual disk size of the block.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.DISK_ONLY,
rdd1b1.memSize, 0L)))

check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 2L)
assert(wrapper.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
assert(wrapper.info.dataDistribution.get.size === 2L)
assert(wrapper.info.partitions.get.size === 2L)
}

check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 2L)
assert(exec.info.memoryUsed === rdd1b2.memSize)
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
}

// Remove block 1 from bm 1; note memSize = 0 due to the eviction above.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize)))
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0, rdd1b1.diskSize)))

check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 2L)
Expand Down Expand Up @@ -1571,7 +1603,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)

val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
assert(part1.storageLevel === level.description)
assert(part1.storageLevel === twoReplicaMemAndDiskLevel.description)
assert(part1.memoryUsed === 2 * rdd1b1.memSize)
assert(part1.diskUsed === 2 * rdd1b1.diskSize)
assert(part1.executors === Seq(bm1.executorId, bm2.executorId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.status

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel

class LiveEntitySuite extends SparkFunSuite {

Expand Down Expand Up @@ -59,8 +60,8 @@ class LiveEntitySuite extends SparkFunSuite {
}

private def newPartition(i: Int): LiveRDDPartition = {
val part = new LiveRDDPartition(i.toString)
part.update(Seq(i.toString), i.toString, i, i)
val part = new LiveRDDPartition(i.toString, StorageLevel.MEMORY_AND_DISK)
part.update(Seq(i.toString), i, i)
part
}

Expand Down