Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,17 @@ private[spark] class AppStatusListener(
}

override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
liveRDDs.remove(event.rddId)
kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
while (true) {
Copy link
Member

@kiszk kiszk Sep 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code may cause infinite loop.
Since liveRDDs is just a HashMap (not a concurrentHashMap or others), update to liveRDDs by other threads may not be visible in this loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a single thread that calls this method (or any other listener method in this class). But I'm kinda wondering what's the point of the loop in the first place...

It seems you'll get into an infinite loop if for some reason rdd.isEmpty() returns false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for letting me know about a single thread.

I agree with your point. As you pointed out, when rdd.isEmpty() continues to return false, it will get into an infinite loop. Thus, I imagine that this works under multithread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi kiszk, thank you for your thread safe remind.

liveRDDs.get(event.rddId) match {
case Some(rdd) =>
if (rdd.isEmpty()) {
liveRDDs.remove(event.rddId)
kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
}
case None =>
return
}
}
}

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
Expand Down Expand Up @@ -710,9 +719,21 @@ private[spark] class AppStatusListener(
val executorId = event.blockUpdatedInfo.blockManagerId.executorId

// Whether values are being added to or removed from the existing accounting.
// BlockManager always send empty block status message when user try to remove rdd block,
// so we try to get this removed block size from rdd partition to get accurate memory/disk storage size.
val storageLevel = event.blockUpdatedInfo.storageLevel
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
val diskDelta: Long = storageLevel != StorageLevel.NONE match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple if is much clearer (and probably faster) here. But see later comment. I believe it's better for this code to be in the "unpersist" callback, and leave the block manager code unchanged.

This code is also wrong the way it is. Because if you're changing the storage level of an RDD (e.g. if it was cached on disk, but after the update, it's not) then this is doing the wrong thing now.

(So, another argument for treating the "unpersist" event differently, and in the appropriate callback.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi vanzin, Great thanks for you advice. Now I will try to modify code only in unpersist method in AppStatusListener class. I still have some quesions, could you help to explain,
1, As I know a spark task is executed with a rdd partition in one Executor process. So why does LiveRDDPartition class has a seq executors not just an executor?
2, I believe that we also need to reduce rdd block number in executor tab when we unpersist rdd, but now rdd blocks info is only held by LiveExecutor which in fact held rdd blocks number for all of the rdds. So as you can see the 842 line in AppStatusListener as follow, https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala#L842
I think the condition 'exec.rddBlocks + rddBlocksDelta > 0' in if block needs to be modified as 'LiveRDDDistribution.rddBlocks + rddBlocksDelta > 0' which means we use LiveRDDDistribution to hold rdd blocks info for paticular rdd in particular executor. Is that OK?

case true => event.blockUpdatedInfo.diskSize
case false => liveRDDs.get(block.rddId).map { rdd =>
rdd.partition(block.name).diskUsed * (-1)
}.get
}
val memoryDelta = storageLevel != StorageLevel.NONE match {
case true => event.blockUpdatedInfo.memSize
case false => liveRDDs.get(block.rddId).map{ rdd =>
rdd.partition(block.name).memoryUsed * (-1)
}.get
}

// Function to apply a delta to a value, but ensure that it doesn't go negative.
def newValue(old: Long, delta: Long): Long = math.max(0, old + delta)
Expand Down Expand Up @@ -810,7 +831,7 @@ private[spark] class AppStatusListener(
// Finish updating the executor now that we know the delta in the number of blocks.
maybeExec.foreach { exec =>
exec.rddBlocks += rddBlocksDelta
maybeUpdate(exec, now)
update(exec, now)
}
}

Expand Down
24 changes: 13 additions & 11 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ package org.apache.spark.status
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.immutable.{HashSet, TreeSet}
import scala.collection.mutable.HashMap

import com.google.common.collect.Interners

import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
Expand All @@ -33,7 +29,9 @@ import org.apache.spark.storage.RDDInfo
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.util.kvstore.KVStore

import scala.collection.immutable.{HashSet, TreeSet}
import scala.collection.mutable.HashMap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is it necessary to change order of import?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE automatically changed that order, I think we import classes in order is ok, so remained that.

/**
* A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live
Expand Down Expand Up @@ -120,7 +118,7 @@ private class LiveTask(
stageAttemptId: Int,
lastUpdateTime: Option[Long]) extends LiveEntity {

import LiveEntityHelpers._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of these import changes are necessary...

import org.apache.spark.status.LiveEntityHelpers._

// The task metrics use a special value when no metrics have been reported. The special value is
// checked when calculating indexed values when writing to the store (see [[TaskDataWrapper]]).
Expand Down Expand Up @@ -313,7 +311,7 @@ private class LiveExecutorStageSummary(
attemptId: Int,
executorId: String) extends LiveEntity {

import LiveEntityHelpers._
import org.apache.spark.status.LiveEntityHelpers._

var taskTime = 0L
var succeededTasks = 0
Expand Down Expand Up @@ -347,7 +345,7 @@ private class LiveExecutorStageSummary(

private class LiveStage extends LiveEntity {

import LiveEntityHelpers._
import org.apache.spark.status.LiveEntityHelpers._

var jobs = Seq[LiveJob]()
var jobIds = Set[Int]()
Expand Down Expand Up @@ -436,7 +434,7 @@ private class LiveStage extends LiveEntity {

private class LiveRDDPartition(val blockName: String) {

import LiveEntityHelpers._
import org.apache.spark.status.LiveEntityHelpers._

// Pointers used by RDDPartitionSeq.
@volatile var prev: LiveRDDPartition = null
Expand Down Expand Up @@ -467,7 +465,7 @@ private class LiveRDDPartition(val blockName: String) {

private class LiveRDDDistribution(exec: LiveExecutor) {

import LiveEntityHelpers._
import org.apache.spark.status.LiveEntityHelpers._

val executorId = exec.executorId
var memoryUsed = 0L
Expand Down Expand Up @@ -498,7 +496,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) {

private class LiveRDD(val info: RDDInfo) extends LiveEntity {

import LiveEntityHelpers._
import org.apache.spark.status.LiveEntityHelpers._

var storageLevel: String = weakIntern(info.storageLevel.description)
var memoryUsed = 0L
Expand All @@ -509,6 +507,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {

private val distributions = new HashMap[String, LiveRDDDistribution]()

def isEmpty(): Boolean = {
memoryUsed == 0L && diskUsed == 0L && partitions.isEmpty && distributions.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

def setStorageLevel(level: String): Unit = {
this.storageLevel = weakIntern(level)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ private[spark] class BlockManager(
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be changing. Instead, the onUnpersistRDD callback in the listener should be updating the executor data it's tracking when the RDD is being unpersisted. That's similar to what I understand was being done in 2.2.

blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = true) }
blocksToRemove.size
}

Expand Down