Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,26 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val dummyBlock = TestBlockId("lonely water")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
when(ms.currentUnrollMemory).thenReturn(100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 560L)
when(ms.currentUnrollMemory).thenReturn(560L)
assert(mm.storageMemoryUsed === 860L)
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 340 bytes.
assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
assert(mm.storageMemoryUsed === 560L)
// We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed
assertEvictBlocksToFreeSpaceCalled(ms, 340L)
assert(mm.storageMemoryUsed === 520L)
// Acquire more unroll memory to exceed our "max unroll space"
assert(mm.acquireUnrollMemory(dummyBlock, 440L, evictedBlocks))
when(ms.currentUnrollMemory).thenReturn(500L)
assert(mm.storageMemoryUsed === 960L)
assert(!mm.acquireUnrollMemory(dummyBlock, 300L, evictedBlocks))
// We already have 500 bytes > the max unroll space of 400 bytes, so no bytes are freed
assertEvictBlocksToFreeSpaceNotCalled(ms)
// Release beyond what was acquired
mm.releaseUnrollMemory(maxStorageMem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
// Execution wants 200 bytes but only 150 are free, so storage is evicted
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 300L)
assert(mm.storageMemoryUsed === 700L)
assertEvictBlocksToFreeSpaceCalled(ms, 50L)
assert(mm.executionMemoryUsed === 300L)
assert(evictedBlocks.nonEmpty)
mm.releaseAllStorageMemory()
evictedBlocks.clear()
Expand All @@ -152,18 +152,21 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(evictedBlocks.isEmpty)
}

test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") {
test("execution memory requests smaller than free memory should evict storage (SPARK-12165)") {
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Acquire enough storage memory to exceed the storage region size
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// Should now be able to require up to 500 bytes of memory
assert(mm.storageMemoryUsed === 700L)
// SPARK-12165: previously, MemoryStore would not evict anything because it would
// mistakenly think that the 300 bytes of free space was still available even after
// using it to expand the execution pool. Consequently, no storage memory was released
// and the following call granted only 300 bytes to execution.
assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
assertEvictBlocksToFreeSpaceCalled(ms, 250L)
assertEvictBlocksToFreeSpaceCalled(ms, 200L)
assert(mm.storageMemoryUsed === 500L)
assert(mm.executionMemoryUsed === 500L)
assert(evictedBlocks.nonEmpty)
Expand Down