diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index f53bc0b02bbf..3251c14604d3 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.memory; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -25,9 +26,19 @@ public class TaskMemoryManagerSuite { + private TaskMemoryManager manager; + + @After + public void after() { + Assert.assertNotNull(manager); + Assert.assertEquals(0, manager.getMemoryConsumptionForThisTask()); + Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory()); + manager = null; + } + @Test public void leakedPageMemoryIsDetected() { - final TaskMemoryManager manager = new TaskMemoryManager( + manager = new TaskMemoryManager( new StaticMemoryManager( new SparkConf().set("spark.memory.offHeap.enabled", "false"), Long.MAX_VALUE, @@ -38,6 +49,7 @@ public void leakedPageMemoryIsDetected() { manager.allocatePage(4096, c); // leak memory Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask()); Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory()); + Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory()); } @Test @@ -45,7 +57,7 @@ public void encodePageNumberAndOffsetOffHeap() { final SparkConf conf = new SparkConf() .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", "1000"); - final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); + manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); // In off-heap mode, an offset is an absolute address that may require more than 51 bits to @@ -58,7 +70,7 @@ public void encodePageNumberAndOffsetOffHeap() { @Test public void encodePageNumberAndOffsetOnHeap() { - final TaskMemoryManager manager = new TaskMemoryManager( + manager = new TaskMemoryManager( new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); @@ -71,7 +83,7 @@ public void encodePageNumberAndOffsetOnHeap() { public void cooperativeSpilling() { final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf()); memoryManager.limit(100); - final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0); + manager = new TaskMemoryManager(memoryManager, 0); TestMemoryConsumer c1 = new TestMemoryConsumer(manager); TestMemoryConsumer c2 = new TestMemoryConsumer(manager); @@ -106,14 +118,13 @@ public void cooperativeSpilling() { c1.free(0); c2.free(100); - Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory()); } @Test public void cooperativeSpilling2() { final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf()); memoryManager.limit(100); - final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0); + manager = new TaskMemoryManager(memoryManager, 0); TestMemoryConsumer c1 = new TestMemoryConsumer(manager); TestMemoryConsumer c2 = new TestMemoryConsumer(manager); @@ -141,14 +152,13 @@ public void cooperativeSpilling2() { c1.free(0); c2.free(80); c3.free(10); - Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory()); } @Test public void shouldNotForceSpillingInDifferentModes() { final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf()); memoryManager.limit(100); - final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0); + manager = new TaskMemoryManager(memoryManager, 0); TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP); @@ -170,7 +180,7 @@ public void offHeapConfigurationBackwardsCompatibility() { final SparkConf conf = new SparkConf() .set("spark.unsafe.offHeap", "true") .set("spark.memory.offHeap.size", "1000"); - final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); + manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode); }