[SPARK-3137][Core]Replace the global TorrentBroadcast lock with fine grained KeyLock#25612
[SPARK-3137][Core]Replace the global TorrentBroadcast lock with fine grained KeyLock#25612zsxwing wants to merge 5 commits intoapache:masterfrom
Conversation
| new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) | ||
| Collections.synchronizedMap( | ||
| new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) | ||
| .asInstanceOf[java.util.Map[Any, Any]] |
There was a problem hiding this comment.
This one now needs protection as the global lock is removed. Casting it because Scala compiler cannot figure it out.
There was a problem hiding this comment.
nit: would it make compiler happy and be nicer if the val was typed (val cachedValues: JMap[Any, Any]) instead of doing asInstanceOf?
There was a problem hiding this comment.
No. The compiler doesn't like Java classes :P
| * Otherwise, if not running in a task then immediately release the lock. | ||
| */ | ||
| private def releaseLock(blockId: BlockId): Unit = { | ||
| private def releaseBlockManagerLock(blockId: BlockId): Unit = { |
There was a problem hiding this comment.
Rename this method to avoid confusing.
|
cc @Ngone51 |
|
Test build #109880 has finished for PR 25612 at commit
|
| private def acquireTorrentBroadcastLock(broadcastId: BroadcastBlockId): Unit = { | ||
| while (true) { | ||
| val lock = torrentBroadcastLock.putIfAbsent(broadcastId, new Object) | ||
| if (lock == null) return |
There was a problem hiding this comment.
Yep when no such key in ConcurrentHashMap.
There was a problem hiding this comment.
Right OK, but then it isn't locking on anything the first time, how does this work?
There was a problem hiding this comment.
- The caller adding an object to
ConcurrentHashMapsuccessfully wins the lock andacquireTorrentBroadcastLockwill return. Others should wait until they add an object toConcurrentHashMapsuccessfully. - In
releaseTorrentBroadcastLock, it will remove the object fromConcurrentHashMapand wake up threads waiting for this object to retry.
There was a problem hiding this comment.
Sorry I get it now. The Object isn't really the lock. It's the presence of absence of the Object. The Object is used to queue up waiting callers.
OK but this seems simpler if you computeIfAbsent a ReentrantLock and just use the lock as a lock directly. It manages all the details of allowing one caller, waking up waiters, etc. I'd imagine it's more straightforward to understand?
There was a problem hiding this comment.
OK but this seems simpler if you
computeIfAbsentaReentrantLockand just use the lock as a lock directly. It manages all the details of allowing one caller, waking up waiters, etc. I'd imagine it's more straightforward to understand?
The unlock code path is pretty tricky. Needs to unlock and remove from the map (the removal will allow a thread to put a new lock). I can try to make these two actions atomic by adding a global lock. But that's more complicated than the current way.
There was a problem hiding this comment.
Isn't this all there is to it, more or less?
val lock = torrentBroadcastLock.computeIfAbsent(broadcastId, new ReentrantLock())
lock.lock()
try {
func
} finally {
lock.unlock()
}
The downside I suppose is that you have a map of locks that keeps growing, but how many broadcast IDs would there be? vs a relatively small lock object for each.
There was a problem hiding this comment.
The number of broadcast IDs is significant enough for a long running application. Even if the user doesn't use broadcast, we will still use it internally, for example, task binary is a broadcast object, a Spark SQL query may use broadcast join...
There was a problem hiding this comment.
OK, makes sense. Well, could still make sense to use a Condition but it wouldn't change much.
| if (lock == null) return | ||
| lock.synchronized { | ||
| while (torrentBroadcastLock.get(broadcastId) eq lock) { | ||
| lock.wait() |
There was a problem hiding this comment.
How about just using a Java Lock implementation rather than re-roll the signalling?
There was a problem hiding this comment.
Using a <ReentrantLock, Condition> pair? The codes would be more complicated.
There was a problem hiding this comment.
No, just a ReentrantLock. I think you don't need to deal with spurious wakeups then either
There was a problem hiding this comment.
I just tried ReentrantLock but didn't find a simple way to implement the same function.
There was a problem hiding this comment.
Why not just stripe over a fixed number of Reentrant locks? This should reduce contention enough (and potential collissions can be controlled with numLocks) is easy to understand and doesn't reimplement its own lock queues. It's also fully interruptible, while in the current implementation the threads that are queueing for the lock are not interruptible.
private val numLocks = 100
private val locks = (1 to numLocks).map(_ => new java.util.concurrent.locks.ReentrantLock)
private def withTorrentBroadcastLock[T](broadcastId: BroadcastBlockId)(func: => T): T = {
val lock = locks(broadcastId.broadcastId % numLocks)
lock.lockInterruptibly()
try {
func
} finally {
lock.unlock()
}
}
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
Outdated
Show resolved
Hide resolved
|
Test build #109916 has finished for PR 25612 at commit
|
|
Test build #109917 has finished for PR 25612 at commit
|
| val lock = lockMap.putIfAbsent(key, new Object) | ||
| if (lock == null) return | ||
| lock.synchronized { | ||
| while (lockMap.get(key) eq lock) { |
There was a problem hiding this comment.
After releasing keylock, if a new thread for the same broadcastId enters to put a new object before another queueing thread to check lockMap.get(key) eq lock, both threads could get the keylock finally ?
There was a problem hiding this comment.
The thread inside this inner while loop will end, and will retry the outer while loop. The line if (lock == null) return is the only exit. When acquireLock returns, this thread has putIfAbsent a new object to lockMap successfully.
|
@juliuszsompolski @srowen I added tests. Please take another look. |
| @@ -56,7 +57,10 @@ private[spark] class BroadcastManager( | |||
| private val nextBroadcastId = new AtomicLong(0) | |||
|
|
|||
| private[broadcast] val cachedValues = { | |||
There was a problem hiding this comment.
You don't need the braces here, but I wouldn't bother unless you're changing this file again
|
Test build #109955 has finished for PR 25612 at commit
|
juliuszsompolski
left a comment
There was a problem hiding this comment.
LGTM!
Made two tiny nits reading through it.
| new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) | ||
| Collections.synchronizedMap( | ||
| new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) | ||
| .asInstanceOf[java.util.Map[Any, Any]] |
There was a problem hiding this comment.
nit: would it make compiler happy and be nicer if the val was typed (val cachedValues: JMap[Any, Any]) instead of doing asInstanceOf?
| val broadcastCache = SparkEnv.get.broadcastManager.cachedValues | ||
| broadcastCache.synchronized { | ||
| TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) { | ||
| // As we just lock based on `broadcastId`, whenever using `broadcastCache`, we should just |
|
Test build #110051 has finished for PR 25612 at commit
|
|
Thanks! Merging to master. |
|
A late LGTM :) |
### What changes were proposed in this pull request? Use `KeyLock` added in #25612 to simplify `MapOutputTracker.getStatuses`. It also has some improvement after the refactoring: - `InterruptedException` is no longer sallowed. - When a shuffle block is fetched, we don't need to wake up unrelated sleeping threads. ### Why are the changes needed? `MapOutputTracker.getStatuses` is pretty hard to maintain right now because it has a special lock mechanism which we needs to pay attention to whenever updating this method. As we can use `KeyLock` to hide the complexity of locking behind a dedicated lock class, it's better to refactor it to make it easy to understand and maintain. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests. Closes #25680 from zsxwing/getStatuses. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
### What changes were proposed in this pull request? Use `KeyLock` added in apache#25612 to simplify `MapOutputTracker.getStatuses`. It also has some improvement after the refactoring: - `InterruptedException` is no longer sallowed. - When a shuffle block is fetched, we don't need to wake up unrelated sleeping threads. ### Why are the changes needed? `MapOutputTracker.getStatuses` is pretty hard to maintain right now because it has a special lock mechanism which we needs to pay attention to whenever updating this method. As we can use `KeyLock` to hide the complexity of locking behind a dedicated lock class, it's better to refactor it to make it easy to understand and maintain. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests. Closes apache#25680 from zsxwing/getStatuses. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
What changes were proposed in this pull request?
This PR provides a new lock mechanism
KeyLockto lock with a given key. Also use this new lock inTorrentBroadcastto avoid blocking tasks from fetching different broadcast values.Why are the changes needed?
TorrentBroadcast.readObjectuses a global lock so only one task can be fetching the blocks at the same time. This is not optimal if we are running multiple stages concurrently because they should be able to independently fetch their own blocks.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.