diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index a97e70d7aab19..60e7705631d29 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -148,20 +148,26 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val dummyBlock = TestBlockId("lonely water") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks)) + when(ms.currentUnrollMemory).thenReturn(100L) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks)) - // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. - // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes. + assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 560L) - when(ms.currentUnrollMemory).thenReturn(560L) + assert(mm.storageMemoryUsed === 860L) + // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. + // Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 340 bytes. assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks)) - assert(mm.storageMemoryUsed === 560L) - // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed + assertEvictBlocksToFreeSpaceCalled(ms, 340L) + assert(mm.storageMemoryUsed === 520L) + // Acquire more unroll memory to exceed our "max unroll space" + assert(mm.acquireUnrollMemory(dummyBlock, 440L, evictedBlocks)) + when(ms.currentUnrollMemory).thenReturn(500L) + assert(mm.storageMemoryUsed === 960L) + assert(!mm.acquireUnrollMemory(dummyBlock, 300L, evictedBlocks)) + // We already have 500 bytes > the max unroll space of 400 bytes, so no bytes are freed assertEvictBlocksToFreeSpaceNotCalled(ms) // Release beyond what was acquired mm.releaseUnrollMemory(maxStorageMem) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 3d259d3058457..1f2500b1c5043 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -131,8 +131,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes // Execution wants 200 bytes but only 150 are free, so storage is evicted assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 300L) + assert(mm.storageMemoryUsed === 700L) assertEvictBlocksToFreeSpaceCalled(ms, 50L) - assert(mm.executionMemoryUsed === 300L) assert(evictedBlocks.nonEmpty) mm.releaseAllStorageMemory() evictedBlocks.clear() @@ -152,18 +152,21 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.isEmpty) } - test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") { + test("execution memory requests smaller than free memory should evict storage (SPARK-12165)") { val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region size - assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) - assert(mm.storageMemoryUsed === 750L) - // Should now be able to require up to 500 bytes of memory + assert(mm.storageMemoryUsed === 700L) + // SPARK-12165: previously, MemoryStore would not evict anything because it would + // mistakenly think that the 300 bytes of free space was still available even after + // using it to expand the execution pool. Consequently, no storage memory was released + // and the following call granted only 300 bytes to execution. assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) - assertEvictBlocksToFreeSpaceCalled(ms, 250L) + assertEvictBlocksToFreeSpaceCalled(ms, 200L) assert(mm.storageMemoryUsed === 500L) assert(mm.executionMemoryUsed === 500L) assert(evictedBlocks.nonEmpty)