-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24889][Core] Update block info when unpersist rdds #22341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95708 has finished for PR 22341 at commit
|
|
retest this please. |
|
|
||
| private[spark] object RDDInfo { | ||
| private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) | ||
| private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the test locally and this causes error when initializing RDDInfo. Actually I think this should be lazy because it is not always needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining.
|
Test build #95709 has finished for PR 22341 at commit
|
| val partitions = liveRDD.getPartitions() | ||
|
|
||
| partitions.foreach { case (_, part) => | ||
| val executors = part.executors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, each parition here contains only one executor, right? Seq Executors here is only single element sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, a partition can exist on more than one executor.
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor things.
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.HashMap | ||
| import scala.collection.mutable.HashSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: merge with above import
| override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { | ||
| liveRDDs.remove(event.rddId) | ||
| liveRDDs.remove(event.rddId).foreach { liveRDD => | ||
| val executorsToUpdate = new HashSet[LiveExecutor]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just call maybeUpdate directly when updating the executor. It's cheaper than inserting into a set, since a duplicate call will just compare the timestamp of the last update and then do nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An executor can be updated first when processing distributions and later when processing partitions. We want a duplicate call to do update in fact. If we just call maybeUpdate directly, the duplicate call will do nothing and miss updates from processing partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. But it would be nice to avoid the hash set if possible. The less stuff listeners have to do, the better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Good point for me. I updated the change to remove the hash set now.
| maybeExec.foreach { exec => | ||
| if (exec.hasMemoryInfo) { | ||
| if (storageLevel.useOffHeap) { | ||
| exec.usedOffHeap = math.max(0, exec.usedOffHeap - rddDist.offHeapUsed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd take the newValue method inside updateRDDBlock and make it a proper private method (with a better name), since then it becomes clearer why this logic is needed.
|
|
||
| // Use RDD distribution to update executor memory and disk usage info. | ||
| distributions.foreach { case (executorId, rddDist) => | ||
| val maybeExec = liveExecutors.get(executorId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need the variable, right? liveExecutors.get(executorId).foreach is enough?
(Same with distributions and partitions and executors.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, right.
| } | ||
|
|
||
| def getPartitions(): Map[String, LiveRDDPartition] = { | ||
| partitions.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes a copy, no? Is there a need to make that copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this just creates an immutable map and inserts all elements into it and so no copy of elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it makes a copy of the map, and that seems unnecessary. Of course it does not make a copy of the elements - it doesn't even know how to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we prefer just to simply return the private mutable HashMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it's an internal API. Listener code needs to avoid doing unnecessary things like copying stuff to avoid issues with dropping events.
|
@vanzin Thanks for the good review! I've updated this to address them all. |
|
Test build #95926 has finished for PR 22341 at commit
|
|
retest this please. |
|
Test build #95937 has finished for PR 22341 at commit
|
| distributions.get(exec.executorId) | ||
| } | ||
|
|
||
| def getPartitions(): scala.collection.Map[String, LiveRDDPartition] = partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have just exposed the fields but this is fine too.
|
Merging to master, 2.4 and 2.3. |
## What changes were proposed in this pull request? We will update block info coming from executors, at the timing like caching a RDD. However, when removing RDDs with unpersisting, we don't ask to update block info. So the block info is not updated. We can fix this with few options: 1. Ask to update block info when unpersisting This is simplest but changes driver-executor communication a bit. 2. Update block info when processing the event of unpersisting RDD We send a `SparkListenerUnpersistRDD` event when unpersisting RDD. When processing this event, we can update block info of the RDD. This only changes event processing code so the risk seems to be lower. Currently this patch takes option 2 for lower risk. If we agree first option has no risk, we can change to it. ## How was this patch tested? Unit tests. Closes #22341 from viirya/SPARK-24889. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 14f3ad2) Signed-off-by: Marcelo Vanzin <[email protected]>
We will update block info coming from executors, at the timing like caching a RDD. However, when removing RDDs with unpersisting, we don't ask to update block info. So the block info is not updated. We can fix this with few options: 1. Ask to update block info when unpersisting This is simplest but changes driver-executor communication a bit. 2. Update block info when processing the event of unpersisting RDD We send a `SparkListenerUnpersistRDD` event when unpersisting RDD. When processing this event, we can update block info of the RDD. This only changes event processing code so the risk seems to be lower. Currently this patch takes option 2 for lower risk. If we agree first option has no risk, we can change to it. Unit tests. Closes #22341 from viirya/SPARK-24889. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 14f3ad2) Signed-off-by: Marcelo Vanzin <[email protected]>
|
There was a trivial conflict in 2.3, I fixed it manually. |
|
Thanks @vanzin |
## What changes were proposed in this pull request? We will update block info coming from executors, at the timing like caching a RDD. However, when removing RDDs with unpersisting, we don't ask to update block info. So the block info is not updated. We can fix this with few options: 1. Ask to update block info when unpersisting This is simplest but changes driver-executor communication a bit. 2. Update block info when processing the event of unpersisting RDD We send a `SparkListenerUnpersistRDD` event when unpersisting RDD. When processing this event, we can update block info of the RDD. This only changes event processing code so the risk seems to be lower. Currently this patch takes option 2 for lower risk. If we agree first option has no risk, we can change to it. ## How was this patch tested? Unit tests. Closes apache#22341 from viirya/SPARK-24889. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
What changes were proposed in this pull request?
We will update block info coming from executors, at the timing like caching a RDD. However, when removing RDDs with unpersisting, we don't ask to update block info. So the block info is not updated.
We can fix this with few options:
This is simplest but changes driver-executor communication a bit.
We send a
SparkListenerUnpersistRDDevent when unpersisting RDD. When processing this event, we can update block info of the RDD. This only changes event processing code so the risk seems to be lower.Currently this patch takes option 2 for lower risk. If we agree first option has no risk, we can change to it.
How was this patch tested?
Unit tests.