-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25091][Core] reduce the storage memory in Executor Tab when unpersist rdd #22335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
|
||
| import scala.collection.immutable.{HashSet, TreeSet} | ||
| import scala.collection.mutable.HashMap | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is it necessary to change order of import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My IDE automatically changed that order, I think we import classes in order is ok, so remained that.
| override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { | ||
| liveRDDs.remove(event.rddId) | ||
| kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) | ||
| while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code may cause infinite loop.
Since liveRDDs is just a HashMap (not a concurrentHashMap or others), update to liveRDDs by other threads may not be visible in this loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a single thread that calls this method (or any other listener method in this class). But I'm kinda wondering what's the point of the loop in the first place...
It seems you'll get into an infinite loop if for some reason rdd.isEmpty() returns false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for letting me know about a single thread.
I agree with your point. As you pointed out, when rdd.isEmpty() continues to return false, it will get into an infinite loop. Thus, I imagine that this works under multithread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi kiszk, thank you for your thread safe remind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about updating LiveExecutor instances in the onUnpersistRDD method by using distributions stored in LiveRDD? As I see LiveRDDDistribution contains all the data which are needed.
That way you can revert BlockManager and LiveEntity changes. And even we can get rid of the while(true) loop as well.
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also need unit tests. A big problem with the previous code this replaced is how little test coverage it had. We should fix that as we find issues in the new code.
| stageAttemptId: Int, | ||
| lastUpdateTime: Option[Long]) extends LiveEntity { | ||
|
|
||
| import LiveEntityHelpers._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of these import changes are necessary...
| private val distributions = new HashMap[String, LiveRDDDistribution]() | ||
|
|
||
| def isEmpty(): Boolean = { | ||
| memoryUsed == 0L && diskUsed == 0L && partitions.isEmpty && distributions.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be the equivalent of storageLevel == None instead? That's what 2.2 does:
| // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. | ||
| logInfo(s"Removing RDD $rddId") | ||
| val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) | ||
| blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be changing. Instead, the onUnpersistRDD callback in the listener should be updating the executor data it's tracking when the RDD is being unpersisted. That's similar to what I understand was being done in 2.2.
| val storageLevel = event.blockUpdatedInfo.storageLevel | ||
| val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) | ||
| val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) | ||
| val diskDelta: Long = storageLevel != StorageLevel.NONE match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A simple if is much clearer (and probably faster) here. But see later comment. I believe it's better for this code to be in the "unpersist" callback, and leave the block manager code unchanged.
This code is also wrong the way it is. Because if you're changing the storage level of an RDD (e.g. if it was cached on disk, but after the update, it's not) then this is doing the wrong thing now.
(So, another argument for treating the "unpersist" event differently, and in the appropriate callback.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi vanzin, Great thanks for you advice. Now I will try to modify code only in unpersist method in AppStatusListener class. I still have some quesions, could you help to explain,
1, As I know a spark task is executed with a rdd partition in one Executor process. So why does LiveRDDPartition class has a seq executors not just an executor?
2, I believe that we also need to reduce rdd block number in executor tab when we unpersist rdd, but now rdd blocks info is only held by LiveExecutor which in fact held rdd blocks number for all of the rdds. So as you can see the 842 line in AppStatusListener as follow, https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala#L842
I think the condition 'exec.rddBlocks + rddBlocksDelta > 0' in if block needs to be modified as 'LiveRDDDistribution.rddBlocks + rddBlocksDelta > 0' which means we use LiveRDDDistribution to hold rdd blocks info for paticular rdd in particular executor. Is that OK?
|
Also, this should be |
|
please fix the description for this PR - the top part contains the truncated title |
|
@cfangplus just for your information, #22341 is modifying the similar parts in |
|
Yea, haven't noticed this, looks like this is duplicate to #22341. |
|
viirya did a great job, seems this pr should be closed. |
@zsxwing
@vanzin
@attilapiros
What changes were proposed in this pull request?
This issue is a UI issue. When we unpersist rdd or UNCACHE TABLE,the storage memory size in Executor tab could not be reduced. So I modify code as follow:
1,BlockManager send block status to BlockManagerMaster after it removed the block
2,AppStatusListener receive block remove status and modify LiveRDD and LiveExecutor info, so the storage memory size and rdd blocks in Executor Tab could change when we unpersist rdd.
How was this patch tested?
cache table and then unpersist table. Watch the WebUI for both storage and executor tab.



