Skip to content

Commit 2c5b9b1

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace
## What changes were proposed in this pull request? MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. ## How was this patch tested? Added unit test. Author: Imran Rashid <[email protected]> Closes apache#19311 from squito/SPARK-22083.
1 parent 365a29b commit 2c5b9b1

File tree

2 files changed

+153
-13
lines changed

2 files changed

+153
-13
lines changed

core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
3232
import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR}
3333
import org.apache.spark.memory.{MemoryManager, MemoryMode}
3434
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
35-
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
35+
import org.apache.spark.storage._
3636
import org.apache.spark.unsafe.Platform
3737
import org.apache.spark.util.{SizeEstimator, Utils}
3838
import org.apache.spark.util.collection.SizeTrackingVector
@@ -544,20 +544,38 @@ private[spark] class MemoryStore(
544544
}
545545

546546
if (freedMemory >= space) {
547-
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
548-
s"(${Utils.bytesToString(freedMemory)} bytes)")
549-
for (blockId <- selectedBlocks) {
550-
val entry = entries.synchronized { entries.get(blockId) }
551-
// This should never be null as only one task should be dropping
552-
// blocks and removing entries. However the check is still here for
553-
// future safety.
554-
if (entry != null) {
555-
dropBlock(blockId, entry)
547+
var lastSuccessfulBlock = -1
548+
try {
549+
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
550+
s"(${Utils.bytesToString(freedMemory)} bytes)")
551+
(0 until selectedBlocks.size).foreach { idx =>
552+
val blockId = selectedBlocks(idx)
553+
val entry = entries.synchronized {
554+
entries.get(blockId)
555+
}
556+
// This should never be null as only one task should be dropping
557+
// blocks and removing entries. However the check is still here for
558+
// future safety.
559+
if (entry != null) {
560+
dropBlock(blockId, entry)
561+
afterDropAction(blockId)
562+
}
563+
lastSuccessfulBlock = idx
564+
}
565+
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
566+
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
567+
freedMemory
568+
} finally {
569+
// like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
570+
// with InterruptedException
571+
if (lastSuccessfulBlock != selectedBlocks.size - 1) {
572+
// the blocks we didn't process successfully are still locked, so we have to unlock them
573+
(lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
574+
val blockId = selectedBlocks(idx)
575+
blockInfoManager.unlock(blockId)
576+
}
556577
}
557578
}
558-
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
559-
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
560-
freedMemory
561579
} else {
562580
blockId.foreach { id =>
563581
logInfo(s"Will not store $id")
@@ -570,6 +588,9 @@ private[spark] class MemoryStore(
570588
}
571589
}
572590

591+
// hook for testing, so we can simulate a race
592+
protected def afterDropAction(blockId: BlockId): Unit = {}
593+
573594
def contains(blockId: BlockId): Boolean = {
574595
entries.synchronized { entries.containsKey(blockId) }
575596
}

core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,4 +407,123 @@ class MemoryStoreSuite
407407
})
408408
assert(memoryStore.getSize(blockId) === 10000)
409409
}
410+
411+
test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
412+
// Setup a memory store with many blocks cached, and then one request which leads to multiple
413+
// blocks getting evicted. We'll make the eviction throw an exception, and make sure that
414+
// all locks are released.
415+
val ct = implicitly[ClassTag[Array[Byte]]]
416+
val numInitialBlocks = 10
417+
val memStoreSize = 100
418+
val bytesPerSmallBlock = memStoreSize / numInitialBlocks
419+
def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): Unit = {
420+
val tc = TaskContext.empty()
421+
val memManager = new StaticMemoryManager(conf, Long.MaxValue, memStoreSize, numCores = 1)
422+
val blockInfoManager = new BlockInfoManager
423+
blockInfoManager.registerTask(tc.taskAttemptId)
424+
var droppedSoFar = 0
425+
val blockEvictionHandler = new BlockEvictionHandler {
426+
var memoryStore: MemoryStore = _
427+
428+
override private[storage] def dropFromMemory[T: ClassTag](
429+
blockId: BlockId,
430+
data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
431+
if (droppedSoFar < numValidBlocks) {
432+
droppedSoFar += 1
433+
memoryStore.remove(blockId)
434+
if (readLockAfterDrop) {
435+
// for testing purposes, we act like another thread gets the read lock on the new
436+
// block
437+
StorageLevel.DISK_ONLY
438+
} else {
439+
StorageLevel.NONE
440+
}
441+
} else {
442+
throw new RuntimeException(s"Mock error dropping block $droppedSoFar")
443+
}
444+
}
445+
}
446+
val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager,
447+
blockEvictionHandler) {
448+
override def afterDropAction(blockId: BlockId): Unit = {
449+
if (readLockAfterDrop) {
450+
// pretend that we get a read lock on the block (now on disk) in another thread
451+
TaskContext.setTaskContext(tc)
452+
blockInfoManager.lockForReading(blockId)
453+
TaskContext.unset()
454+
}
455+
}
456+
}
457+
458+
blockEvictionHandler.memoryStore = memoryStore
459+
memManager.setMemoryStore(memoryStore)
460+
461+
// Put in some small blocks to fill up the memory store
462+
val initialBlocks = (1 to numInitialBlocks).map { id =>
463+
val blockId = BlockId(s"rdd_1_$id")
464+
val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
465+
val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
466+
assert(initialWriteLock)
467+
val success = memoryStore.putBytes(blockId, bytesPerSmallBlock, MemoryMode.ON_HEAP, () => {
468+
new ChunkedByteBuffer(ByteBuffer.allocate(bytesPerSmallBlock))
469+
})
470+
assert(success)
471+
blockInfoManager.unlock(blockId, None)
472+
}
473+
assert(blockInfoManager.size === numInitialBlocks)
474+
475+
476+
// Add one big block, which will require evicting everything in the memorystore. However our
477+
// mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared.
478+
val largeBlockId = BlockId(s"rdd_2_1")
479+
val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
480+
val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
481+
assert(initialWriteLock)
482+
if (numValidBlocks < numInitialBlocks) {
483+
val exc = intercept[RuntimeException] {
484+
memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => {
485+
new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize))
486+
})
487+
}
488+
assert(exc.getMessage().startsWith("Mock error dropping block"), exc)
489+
// BlockManager.doPut takes care of releasing the lock for the newly written block -- not
490+
// testing that here, so do it manually
491+
blockInfoManager.removeBlock(largeBlockId)
492+
} else {
493+
memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => {
494+
new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize))
495+
})
496+
// BlockManager.doPut takes care of releasing the lock for the newly written block -- not
497+
// testing that here, so do it manually
498+
blockInfoManager.unlock(largeBlockId)
499+
}
500+
501+
val largeBlockInMemory = if (numValidBlocks == numInitialBlocks) 1 else 0
502+
val expBlocks = numInitialBlocks +
503+
(if (readLockAfterDrop) 0 else -numValidBlocks) +
504+
largeBlockInMemory
505+
assert(blockInfoManager.size === expBlocks)
506+
507+
val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) =>
508+
assert(info.writerTask === BlockInfo.NO_WRITER, id)
509+
// in this test, all the blocks in memory have no reader, but everything dropped to disk
510+
// had another thread read the block. We shouldn't lose the other thread's reader lock.
511+
if (memoryStore.contains(id)) {
512+
assert(info.readerCount === 0, id)
513+
true
514+
} else {
515+
assert(info.readerCount === 1, id)
516+
false
517+
}
518+
}
519+
assert(blocksStillInMemory.size ===
520+
(numInitialBlocks - numValidBlocks + largeBlockInMemory))
521+
}
522+
523+
Seq(0, 3, numInitialBlocks).foreach { failAfterDropping =>
524+
Seq(true, false).foreach { readLockAfterDropping =>
525+
testFailureOnNthDrop(failAfterDropping, readLockAfterDropping)
526+
}
527+
}
528+
}
410529
}

0 commit comments

Comments
 (0)