Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,37 @@ private[spark] class AppStatusListener(
}

override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
liveRDDs.remove(event.rddId)
liveRDDs.remove(event.rddId).foreach { liveRDD =>
val storageLevel = liveRDD.info.storageLevel

// Use RDD partition info to update executor block info.
liveRDD.getPartitions().foreach { case (_, part) =>
part.executors.foreach { executorId =>
liveExecutors.get(executorId).foreach { exec =>
exec.rddBlocks = exec.rddBlocks - 1
}
}
}

val now = System.nanoTime()

// Use RDD distribution to update executor memory and disk usage info.
liveRDD.getDistributions().foreach { case (executorId, rddDist) =>
liveExecutors.get(executorId).foreach { exec =>
if (exec.hasMemoryInfo) {
if (storageLevel.useOffHeap) {
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed)
} else {
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed)
}
}
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed)
exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed)
maybeUpdate(exec, now)
}
}
}

kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
}

Expand Down Expand Up @@ -705,6 +735,11 @@ private[spark] class AppStatusListener(
.sortBy(_.stageId)
}

/**
* Apply a delta to a value, but ensure that it doesn't go negative.
*/
private def addDeltaToValue(old: Long, delta: Long): Long = math.max(0, old + delta)

private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
val now = System.nanoTime()
val executorId = event.blockUpdatedInfo.blockManagerId.executorId
Expand All @@ -714,9 +749,6 @@ private[spark] class AppStatusListener(
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)

// Function to apply a delta to a value, but ensure that it doesn't go negative.
def newValue(old: Long, delta: Long): Long = math.max(0, old + delta)

val updatedStorageLevel = if (storageLevel.isValid) {
Some(storageLevel.description)
} else {
Expand All @@ -733,13 +765,13 @@ private[spark] class AppStatusListener(
maybeExec.foreach { exec =>
if (exec.hasMemoryInfo) {
if (storageLevel.useOffHeap) {
exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta)
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
} else {
exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta)
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
}
}
exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta)
exec.diskUsed = newValue(exec.diskUsed, diskDelta)
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
}

// Update the block entry in the RDD info, keeping track of the deltas above so that we
Expand Down Expand Up @@ -767,23 +799,23 @@ private[spark] class AppStatusListener(
// Only update the partition if it's still stored in some executor, otherwise get rid of it.
if (executors.nonEmpty) {
partition.update(executors, rdd.storageLevel,
newValue(partition.memoryUsed, memoryDelta),
newValue(partition.diskUsed, diskDelta))
addDeltaToValue(partition.memoryUsed, memoryDelta),
addDeltaToValue(partition.diskUsed, diskDelta))
} else {
rdd.removePartition(block.name)
}

maybeExec.foreach { exec =>
if (exec.rddBlocks + rddBlocksDelta > 0) {
val dist = rdd.distribution(exec)
dist.memoryUsed = newValue(dist.memoryUsed, memoryDelta)
dist.diskUsed = newValue(dist.diskUsed, diskDelta)
dist.memoryUsed = addDeltaToValue(dist.memoryUsed, memoryDelta)
dist.diskUsed = addDeltaToValue(dist.diskUsed, diskDelta)

if (exec.hasMemoryInfo) {
if (storageLevel.useOffHeap) {
dist.offHeapUsed = newValue(dist.offHeapUsed, memoryDelta)
dist.offHeapUsed = addDeltaToValue(dist.offHeapUsed, memoryDelta)
} else {
dist.onHeapUsed = newValue(dist.onHeapUsed, memoryDelta)
dist.onHeapUsed = addDeltaToValue(dist.onHeapUsed, memoryDelta)
}
}
dist.lastUpdate = null
Expand All @@ -802,8 +834,8 @@ private[spark] class AppStatusListener(
}
}

rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta)
rdd.diskUsed = newValue(rdd.diskUsed, diskDelta)
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, memoryDelta)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed, diskDelta)
update(rdd, now)
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
distributions.get(exec.executorId)
}

def getPartitions(): scala.collection.Map[String, LiveRDDPartition] = partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have just exposed the fields but this is fine too.


def getDistributions(): scala.collection.Map[String, LiveRDDDistribution] = distributions

override protected def doUpdate(): Any = {
val dists = if (distributions.nonEmpty) {
Some(distributions.values.map(_.toApi()).toSeq)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class RDDInfo(
}

private[spark] object RDDInfo {
private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test locally and this causes error when initializing RDDInfo. Actually I think this should be lazy because it is not always needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining.


def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,12 +881,41 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(dist.memoryRemaining === maxMemory - rdd2b1.memSize - rdd1b2.memSize )
}

// Add block1 of rdd1 back to bm 1.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))

check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 3L)
assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize + rdd2b1.memSize)
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize + rdd2b1.diskSize)
}

// Unpersist RDD1.
listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId))
intercept[NoSuchElementException] {
check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () }
}

// executor1 now only contains block1 from rdd2.
check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 1L)
assert(exec.info.memoryUsed === rdd2b1.memSize)
assert(exec.info.diskUsed === rdd2b1.diskSize)
}

// Unpersist RDD2.
listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd2b1.rddId))
intercept[NoSuchElementException] {
check[RDDStorageInfoWrapper](rdd2b1.rddId) { _ => () }
}

check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 0L)
assert(exec.info.memoryUsed === 0)
assert(exec.info.diskUsed === 0)
}

// Update a StreamBlock.
val stream1 = StreamBlockId(1, 1L)
listener.onBlockUpdated(SparkListenerBlockUpdated(
Expand Down