From 1c4a6af2077e7ac50476101031aa1af99ff4f7b7 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 19 May 2018 01:06:15 +0300 Subject: [PATCH 01/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: add failing test. --- .../collection/ExternalAppendOnlyMap.scala | 10 ++-- .../ExternalAppendOnlyMapSuite.scala | 52 ++++++++++++++++++- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 5c6dd45ec58e3..072ecf11743d5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -20,12 +20,12 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator +import com.google.common.annotations.VisibleForTesting + import scala.collection.BufferedIterator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer - import com.google.common.io.ByteStreams - import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.ShuffleWriteMetrics @@ -80,7 +80,7 @@ class ExternalAppendOnlyMap[K, V, C]( this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get()) } - @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C] + @VisibleForTesting @volatile private[collection] var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf private val diskBlockManager = blockManager.diskBlockManager @@ -114,7 +114,7 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - @volatile private var readingIterator: SpillableIterator = null + @VisibleForTesting @volatile private[collection] var readingIterator: SpillableIterator = null /** * Number of files this map has spilled so far. @@ -568,7 +568,7 @@ class ExternalAppendOnlyMap[K, V, C]( context.addTaskCompletionListener(context => cleanup()) } - private[this] class SpillableIterator(var upstream: Iterator[(K, C)]) + @VisibleForTesting private[collection] class SpillableIterator(var upstream: Iterator[(K, C)]) extends Iterator[(K, C)] { private val SPILL_LOCK = new Object() diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 35312f2d71131..748eebbd8021a 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -18,13 +18,13 @@ package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer - import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils +import org.apache.spark.util.CompletionIterator -class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { +class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ import TestUtils.{assertNotSpilled, assertSpilled} private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS @@ -414,6 +414,54 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { + val size = 1000 + val conf = createSparkConf(loadDefaults = true) + sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) + val map = createExternalMap[Int] + + map.insertAll((0 until size).iterator.map(i => (i / 10, i))) + assert(map.numSpills == 0, "map was not supposed to spill") + + val it = map.iterator + assert( it.isInstanceOf[CompletionIterator[_, _]]) + val underlyingIt = map.readingIterator + assert( underlyingIt != null ) + val underlyingMapIterator = underlyingIt.upstream + assert(underlyingMapIterator != null) + val underlyingMapIteratorClass = underlyingMapIterator.getClass + assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_,_]]) + + val underlyingMap = map.currentMap + assert(underlyingMap != null) + + val first50Keys = for( _ <- 0 until 50) yield { + val (k,vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k + } + assert( map.numSpills == 0 ) + map.spill(Long.MaxValue, null) + //assert( map.numSpills == 1) + //these asserts basically try to show that we're no longer holding references to the underlying AppendOnlyMap + //it'd be nice to use something like https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala#L69-89 + assert(map.currentMap == null) + assert(underlyingIt.upstream ne underlyingMapIterator) + assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) + assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_,_]]) + + val next50Keys = for( _ <- 0 until 50) yield { + val (k,vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k + } + assert(!it.hasNext) + val keys = (first50Keys ++ next50Keys).sorted + assert(keys == (0 until 100)) + } + test("external aggregation updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) From 82591e63bf30fad37b90956c226101e428d39787 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 19 May 2018 01:21:33 +0300 Subject: [PATCH 02/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix the issue by removing the reference to the initial iterator. --- .../spark/util/collection/ExternalAppendOnlyMap.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 072ecf11743d5..7493785a91382 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -573,7 +573,7 @@ class ExternalAppendOnlyMap[K, V, C]( private val SPILL_LOCK = new Object() - private var nextUpstream: Iterator[(K, C)] = null + //private var nextUpstream: Iterator[(K, C)] = null private var cur: (K, C) = readNext() @@ -585,17 +585,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") - nextUpstream = spillMemoryIteratorToDisk(upstream) + val nextUpstream = spillMemoryIteratorToDisk(upstream) + assert(!upstream.hasNext) hasSpilled = true + upstream = nextUpstream true } } def readNext(): (K, C) = SPILL_LOCK.synchronized { - if (nextUpstream != null) { - upstream = nextUpstream - nextUpstream = null - } if (upstream.hasNext) { upstream.next() } else { From 72f6386bb07bac78f32df5085c6d6a515c14ffa0 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 19 May 2018 08:29:27 +0300 Subject: [PATCH 03/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix scalastyle issues. --- .../collection/ExternalAppendOnlyMap.scala | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 7493785a91382..240f9cd8edba2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -20,12 +20,12 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator -import com.google.common.annotations.VisibleForTesting - import scala.collection.BufferedIterator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer + import com.google.common.io.ByteStreams + import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.ShuffleWriteMetrics @@ -80,7 +80,10 @@ class ExternalAppendOnlyMap[K, V, C]( this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get()) } - @VisibleForTesting @volatile private[collection] var currentMap = new SizeTrackingAppendOnlyMap[K, C] + /** + * Exposed for testing + */ + @volatile private[collection] var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf private val diskBlockManager = blockManager.diskBlockManager @@ -106,6 +109,7 @@ class ExternalAppendOnlyMap[K, V, C]( // Write metrics private val writeMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() + private val writeMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() // Peak size of the in-memory map observed so far, in bytes private var _peakMemoryUsedBytes: Long = 0L @@ -114,7 +118,10 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - @VisibleForTesting @volatile private[collection] var readingIterator: SpillableIterator = null + /** + * Exposed for testing + */ + @volatile private[collection] var readingIterator: SpillableIterator = null /** * Number of files this map has spilled so far. @@ -568,13 +575,14 @@ class ExternalAppendOnlyMap[K, V, C]( context.addTaskCompletionListener(context => cleanup()) } - @VisibleForTesting private[collection] class SpillableIterator(var upstream: Iterator[(K, C)]) + /** + * Exposed for testing + */ + private[collection] class SpillableIterator(var upstream: Iterator[(K, C)]) extends Iterator[(K, C)] { private val SPILL_LOCK = new Object() - //private var nextUpstream: Iterator[(K, C)] = null - private var cur: (K, C) = readNext() private var hasSpilled: Boolean = false From 48224d95ea0be669065a6ae9c9b80b128c27063b Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 19 May 2018 08:49:05 +0300 Subject: [PATCH 04/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: some more styling issues. --- .../ExternalAppendOnlyMapSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 748eebbd8021a..4b5e0a1280ca1 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer + import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec @@ -430,29 +431,30 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ val underlyingMapIterator = underlyingIt.upstream assert(underlyingMapIterator != null) val underlyingMapIteratorClass = underlyingMapIterator.getClass - assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_,_]]) + assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) val underlyingMap = map.currentMap assert(underlyingMap != null) - val first50Keys = for( _ <- 0 until 50) yield { - val (k,vs) = it.next + val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next val sortedVs = vs.sorted assert(sortedVs.seq == (0 until 10).map(10 * k + _)) k } assert( map.numSpills == 0 ) map.spill(Long.MaxValue, null) - //assert( map.numSpills == 1) - //these asserts basically try to show that we're no longer holding references to the underlying AppendOnlyMap - //it'd be nice to use something like https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala#L69-89 + // these asserts try to show that we're no longer holding references to the underlying map. + // it'd be nice to use something like + // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala + // (lines 69-89) assert(map.currentMap == null) assert(underlyingIt.upstream ne underlyingMapIterator) assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) - assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_,_]]) + assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) - val next50Keys = for( _ <- 0 until 50) yield { - val (k,vs) = it.next + val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next val sortedVs = vs.sorted assert(sortedVs.seq == (0 until 10).map(10 * k + _)) k From 536a7694086525ccbd5c43fb138ff525c1b945fc Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 19 May 2018 08:50:20 +0300 Subject: [PATCH 05/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix typo --- .../org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 240f9cd8edba2..9a42a848637c8 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -109,7 +109,6 @@ class ExternalAppendOnlyMap[K, V, C]( // Write metrics private val writeMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() - private val writeMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() // Peak size of the in-memory map observed so far, in bytes private var _peakMemoryUsedBytes: Long = 0L From 589b423bf3828055b6431d463f6a7fbb10a9fa2e Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 20 May 2018 11:51:48 +0300 Subject: [PATCH 06/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: introduce a test showing that an exhausted iterator still reffers the underlying map. --- .../ExternalAppendOnlyMapSuite.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 4b5e0a1280ca1..78ddb327b7de6 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -464,6 +464,50 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ assert(keys == (0 until 100)) } + test("drop all references to the underlying map once the iterator is exhausted") { + val size = 1000 + val conf = createSparkConf(loadDefaults = true) + sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) + val map = createExternalMap[Int] + + map.insertAll((0 until size).iterator.map(i => (i / 10, i))) + assert(map.numSpills == 0, "map was not supposed to spill") + + val it = map.iterator + assert( it.isInstanceOf[CompletionIterator[_, _]]) + val underlyingIt = map.readingIterator + assert( underlyingIt != null ) + val underlyingMapIterator = underlyingIt.upstream + assert(underlyingMapIterator != null) + val underlyingMapIteratorClass = underlyingMapIterator.getClass + assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + + val underlyingMap = map.currentMap + assert(underlyingMap != null) + + val keys = it.map{ + case (k, vs) => + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k + } + .toList + .sorted + + assert(it.isEmpty) + assert(keys == (0 until 100)) + + assert( map.numSpills == 0 ) + // these asserts try to show that we're no longer holding references to the underlying map. + // it'd be nice to use something like + // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala + // (lines 69-89) + assert(map.currentMap == null) + assert(underlyingIt.upstream ne underlyingMapIterator) + assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) + assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + } + test("external aggregation updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) From d5ee172f1e5116a703eb447a57d9b33c44900e02 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 20 May 2018 12:10:33 +0300 Subject: [PATCH 07/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: introduce method SpillableIterator.toCompletionIterator and use that instead of 'maually' creating the completion iterator. also introduced SpillableIterator.destoy which removes the reference to the upstream iterator and calls freeCurrentMap(). --- .../collection/ExternalAppendOnlyMap.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 9a42a848637c8..5cd96e07b7882 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -273,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) - readingIterator + readingIterator.toCompletionIterator } /** @@ -286,8 +286,7 @@ class ExternalAppendOnlyMap[K, V, C]( "ExternalAppendOnlyMap.iterator is destructive and should only be called once.") } if (spilledMaps.isEmpty) { - CompletionIterator[(K, C), Iterator[(K, C)]]( - destructiveIterator(currentMap.iterator), freeCurrentMap()) + destructiveIterator(currentMap.iterator) } else { new ExternalIterator() } @@ -311,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order - private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator( - currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap()) + private val sortedMap = destructiveIterator( + currentMap.destructiveSortedIterator(keyComparator)) private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered) inputStreams.foreach { it => @@ -600,6 +599,15 @@ class ExternalAppendOnlyMap[K, V, C]( } } + def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty + } + + def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { + CompletionIterator[(K, C), SpillableIterator](this, this.destroy ) + } + def readNext(): (K, C) = SPILL_LOCK.synchronized { if (upstream.hasNext) { upstream.next() @@ -635,7 +643,7 @@ private[spark] object ExternalAppendOnlyMap { } /** - * A comparator which sorts arbitrary keys based on their hash codes. + * A comparator which sorts arbitrary keys bas on their hash codes. */ private class HashComparator[K] extends Comparator[K] { def compare(key1: K, key2: K): Int = { From 1d1ddcefec8c8cbd8d9cd1dc5ded1eb989155233 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 22 May 2018 10:18:55 +0300 Subject: [PATCH 08/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: address comments by @advancedxy --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 5 +++-- .../spark/util/collection/ExternalAppendOnlyMapSuite.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 5cd96e07b7882..1fde8cf4e6f17 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -599,11 +599,12 @@ class ExternalAppendOnlyMap[K, V, C]( } } - def destroy() : Unit = { + private def destroy() : Unit = { freeCurrentMap() upstream = Iterator.empty } + private[ExternalAppendOnlyMap] def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { CompletionIterator[(K, C), SpillableIterator](this, this.destroy ) } @@ -643,7 +644,7 @@ private[spark] object ExternalAppendOnlyMap { } /** - * A comparator which sorts arbitrary keys bas on their hash codes. + * A comparator which sorts arbitrary keys based on their hash codes. */ private class HashComparator[K] extends Comparator[K] { def compare(key1: K, key2: K): Int = { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 78ddb327b7de6..f406618b1bbf3 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.util.CompletionIterator -class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ +class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { import TestUtils.{assertNotSpilled, assertSpilled} private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS From e3c61fd596cdb6eb6b54a6eee0004ca79cb553af Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 23 May 2018 22:50:18 +0300 Subject: [PATCH 09/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: address comments by @cloud-fan --- .../spark/util/collection/ExternalAppendOnlyMapSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index f406618b1bbf3..7055f90a3e6ce 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -415,7 +415,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("spill during iteration") { + test("SPARK-22713 spill during iteration leaks internal map") { val size = 1000 val conf = createSparkConf(loadDefaults = true) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) @@ -425,7 +425,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(map.numSpills == 0, "map was not supposed to spill") val it = map.iterator - assert( it.isInstanceOf[CompletionIterator[_, _]]) + assert(it.isInstanceOf[CompletionIterator[_, _]]) val underlyingIt = map.readingIterator assert( underlyingIt != null ) val underlyingMapIterator = underlyingIt.upstream @@ -508,7 +508,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 external aggregation updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) From bc7dc11383db8370f755a058f4b908588f93edc8 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Thu, 24 May 2018 14:06:03 +0300 Subject: [PATCH 10/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: address some more comments by cloud-fan. --- .../collection/ExternalAppendOnlyMapSuite.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 7055f90a3e6ce..2387ab2e878ab 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -431,6 +431,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { val underlyingMapIterator = underlyingIt.upstream assert(underlyingMapIterator != null) val underlyingMapIteratorClass = underlyingMapIterator.getClass + // org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns + // an instance of an annonymous Iterator class. assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) val underlyingMap = map.currentMap @@ -451,7 +453,9 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(map.currentMap == null) assert(underlyingIt.upstream ne underlyingMapIterator) assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) - assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + assert(underlyingIt.upstream.getClass eq + util.Utils.classForName( + "org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator")) val next50Keys = for ( _ <- 0 until 50) yield { val (k, vs) = it.next @@ -474,12 +478,14 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(map.numSpills == 0, "map was not supposed to spill") val it = map.iterator - assert( it.isInstanceOf[CompletionIterator[_, _]]) + assert(it.isInstanceOf[CompletionIterator[_, _]]) val underlyingIt = map.readingIterator - assert( underlyingIt != null ) + assert(underlyingIt != null ) val underlyingMapIterator = underlyingIt.upstream assert(underlyingMapIterator != null) val underlyingMapIteratorClass = underlyingMapIterator.getClass + // org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns + // an instance of an annonymous Iterator class. assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) val underlyingMap = map.currentMap @@ -497,7 +503,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(it.isEmpty) assert(keys == (0 until 100)) - assert( map.numSpills == 0 ) + assert(map.numSpills == 0 ) // these asserts try to show that we're no longer holding references to the underlying map. // it'd be nice to use something like // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala @@ -505,7 +511,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(map.currentMap == null) assert(underlyingIt.upstream ne underlyingMapIterator) assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) - assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + assert(underlyingIt.upstream eq Iterator.empty) } test("SPARK-22713 external aggregation updates peak execution memory") { From 807032dcded2d7ec9b879176b7c5116df0f424ad Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Thu, 24 May 2018 16:45:45 +0300 Subject: [PATCH 11/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: few more comments by cloud-fan. --- .../util/collection/ExternalAppendOnlyMapSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 2387ab2e878ab..671a7888390fd 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -427,7 +427,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { val it = map.iterator assert(it.isInstanceOf[CompletionIterator[_, _]]) val underlyingIt = map.readingIterator - assert( underlyingIt != null ) + assert(underlyingIt != null) val underlyingMapIterator = underlyingIt.upstream assert(underlyingMapIterator != null) val underlyingMapIteratorClass = underlyingMapIterator.getClass @@ -444,7 +444,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(sortedVs.seq == (0 until 10).map(10 * k + _)) k } - assert( map.numSpills == 0 ) + assert(map.numSpills == 0) map.spill(Long.MaxValue, null) // these asserts try to show that we're no longer holding references to the underlying map. // it'd be nice to use something like @@ -480,7 +480,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { val it = map.iterator assert(it.isInstanceOf[CompletionIterator[_, _]]) val underlyingIt = map.readingIterator - assert(underlyingIt != null ) + assert(underlyingIt != null) val underlyingMapIterator = underlyingIt.upstream assert(underlyingMapIterator != null) val underlyingMapIteratorClass = underlyingMapIterator.getClass @@ -503,7 +503,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { assert(it.isEmpty) assert(keys == (0 until 100)) - assert(map.numSpills == 0 ) + assert(map.numSpills == 0) // these asserts try to show that we're no longer holding references to the underlying map. // it'd be nice to use something like // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala From 621bd23dcae5f8f81be15c6094f35242517b29c4 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 25 May 2018 09:52:24 +0300 Subject: [PATCH 12/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: address yet another comment from @cloud-fan. --- .../org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1fde8cf4e6f17..176294051a2b5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -604,7 +604,6 @@ class ExternalAppendOnlyMap[K, V, C]( upstream = Iterator.empty } - private[ExternalAppendOnlyMap] def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { CompletionIterator[(K, C), SpillableIterator](this, this.destroy ) } From a2e78e2c8ef83c4945319e8d02145cdd21b2f110 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 27 May 2018 14:17:47 +0300 Subject: [PATCH 13/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: rewrite the first test using WeakReference. --- .../collection/ExternalAppendOnlyMapSuite.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 748eebbd8021a..b861f2783a14e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.util.CompletionIterator +import scala.ref.WeakReference + class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ import TestUtils.{assertNotSpilled, assertSpilled} @@ -425,15 +427,9 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ val it = map.iterator assert( it.isInstanceOf[CompletionIterator[_, _]]) - val underlyingIt = map.readingIterator - assert( underlyingIt != null ) - val underlyingMapIterator = underlyingIt.upstream - assert(underlyingMapIterator != null) - val underlyingMapIteratorClass = underlyingMapIterator.getClass - assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_,_]]) - val underlyingMap = map.currentMap - assert(underlyingMap != null) + val underlyingMapRef = WeakReference(map.currentMap) + assert(underlyingMapRef.get.nonEmpty) val first50Keys = for( _ <- 0 until 50) yield { val (k,vs) = it.next @@ -447,9 +443,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ //these asserts basically try to show that we're no longer holding references to the underlying AppendOnlyMap //it'd be nice to use something like https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala#L69-89 assert(map.currentMap == null) - assert(underlyingIt.upstream ne underlyingMapIterator) - assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) - assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_,_]]) + System.gc() + assert(underlyingMapRef.get.isEmpty) val next50Keys = for( _ <- 0 until 50) yield { val (k,vs) = it.next From 686b4d9247b7e9d2464bbe2a2d0efa345e49fa21 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 16 Jun 2018 21:35:25 +0300 Subject: [PATCH 14/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: failed getting the weak ref testing based approach to work --- .../ExternalAppendOnlyMapSuite.scala | 49 +++++++++++++------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index d06e298751efc..0bec9d6b67f02 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -17,17 +17,23 @@ package org.apache.spark.util.collection +import java.util.Objects + import scala.collection.mutable.ArrayBuffer import scala.ref.WeakReference - import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.util.CompletionIterator +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import scala.concurrent.duration.{Duration, FiniteDuration} -class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ + +class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext with Eventually with Matchers{ import TestUtils.{assertNotSpilled, assertSpilled} private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS @@ -444,9 +450,16 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ // it'd be nice to use something like // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala // (lines 69-89) - assert(map.currentMap == null) - System.gc() - assert(underlyingMapRef.get.isEmpty) + //assert(map.currentMap == null) +// eventually (Timeout(FiniteDuration(500, scala.concurrent.duration.SECONDS))){ +// System.gc() +// underlyingMapRef.get should be (empty) +// } + while(underlyingMapRef.get.nonEmpty){ + System.gc() + Thread.sleep(5000) + } + val next50Keys = for ( _ <- 0 until 50) yield { val (k, vs) = it.next @@ -468,17 +481,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ map.insertAll((0 until size).iterator.map(i => (i / 10, i))) assert(map.numSpills == 0, "map was not supposed to spill") + val underlyingMapRef = WeakReference(map.currentMap) + assert(underlyingMapRef.get.nonEmpty) + val it = map.iterator assert( it.isInstanceOf[CompletionIterator[_, _]]) - val underlyingIt = map.readingIterator - assert( underlyingIt != null ) - val underlyingMapIterator = underlyingIt.upstream - assert(underlyingMapIterator != null) - val underlyingMapIteratorClass = underlyingMapIterator.getClass - assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) - val underlyingMap = map.currentMap - assert(underlyingMap != null) val keys = it.map{ case (k, vs) => @@ -498,9 +506,18 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{ // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala // (lines 69-89) assert(map.currentMap == null) - assert(underlyingIt.upstream ne underlyingMapIterator) - assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) - assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + +// eventually(Timeout(FiniteDuration(5, scala.concurrent.duration.SECONDS))){ +// System.gc() +// underlyingMapRef.get should be (empty) +// } + + while(underlyingMapRef.get.nonEmpty){ + System.gc() + Thread.sleep(5000) + } + + assert(it.toList.isEmpty) } test("external aggregation updates peak execution memory") { From 4e445856d40ed914fb23273b535c83ea6d07ac5c Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 11 Aug 2018 21:36:16 +0300 Subject: [PATCH 15/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: use wek ref to assert the map is no longer reachable. --- .../ExternalAppendOnlyMapSuite.scala | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 0bec9d6b67f02..0305ff388c9fa 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -21,19 +21,20 @@ import java.util.Objects import scala.collection.mutable.ArrayBuffer import scala.ref.WeakReference + +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually + import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.util.CompletionIterator -import org.scalatest.Matchers -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration.Timeout - -import scala.concurrent.duration.{Duration, FiniteDuration} - -class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext with Eventually with Matchers{ +class ExternalAppendOnlyMapSuite extends SparkFunSuite + with LocalSparkContext + with Eventually + with Matchers{ import TestUtils.{assertNotSpilled, assertSpilled} private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS @@ -436,7 +437,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext wi assert( it.isInstanceOf[CompletionIterator[_, _]]) val underlyingMapRef = WeakReference(map.currentMap) - assert(underlyingMapRef.get.nonEmpty) + + { + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) + } val first50Keys = for ( _ <- 0 until 50) yield { val (k, vs) = it.next @@ -450,14 +456,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext wi // it'd be nice to use something like // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala // (lines 69-89) - //assert(map.currentMap == null) -// eventually (Timeout(FiniteDuration(500, scala.concurrent.duration.SECONDS))){ -// System.gc() -// underlyingMapRef.get should be (empty) -// } - while(underlyingMapRef.get.nonEmpty){ + // assert(map.currentMap == null) + eventually{ System.gc() - Thread.sleep(5000) + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + tmpIsNull } @@ -482,7 +486,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext wi assert(map.numSpills == 0, "map was not supposed to spill") val underlyingMapRef = WeakReference(map.currentMap) - assert(underlyingMapRef.get.nonEmpty) + + { + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) + } val it = map.iterator assert( it.isInstanceOf[CompletionIterator[_, _]]) @@ -507,14 +516,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext wi // (lines 69-89) assert(map.currentMap == null) -// eventually(Timeout(FiniteDuration(5, scala.concurrent.duration.SECONDS))){ -// System.gc() -// underlyingMapRef.get should be (empty) -// } - - while(underlyingMapRef.get.nonEmpty){ + eventually{ System.gc() - Thread.sleep(5000) + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + tmpIsNull } assert(it.toList.isEmpty) From 0cf8913a63dbfd89bbea8a874f28e45dcbb02a29 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 11 Aug 2018 22:40:38 +0300 Subject: [PATCH 16/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: fix tests to effectively test for non-reachabillity of the internal map. --- .../spark/util/collection/ExternalAppendOnlyMapSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 0305ff388c9fa..a2c675d50d09c 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -461,7 +461,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite System.gc() // direct asserts introduced some macro generated code that held a reference to the map val tmpIsNull = null == underlyingMapRef.get.orNull - tmpIsNull + assert(tmpIsNull) } @@ -517,10 +517,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite assert(map.currentMap == null) eventually{ + Thread.sleep(500) System.gc() // direct asserts introduced some macro generated code that held a reference to the map val tmpIsNull = null == underlyingMapRef.get.orNull - tmpIsNull + assert(tmpIsNull) } assert(it.toList.isEmpty) From 11b5bb44ba9085da1a647230971b56b46e7a8d40 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sat, 11 Aug 2018 22:49:17 +0300 Subject: [PATCH 17/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: fix after merger. --- .../spark/util/collection/ExternalAppendOnlyMapSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index ae003c32b6b36..09ef3cdc6f657 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -435,7 +435,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val it = map.iterator assert(it.isInstanceOf[CompletionIterator[_, _]]) - assert(underlyingIt != null) // org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns // an instance of an annonymous Iterator class. From 855854af4762057438b996c58639a38c773ac0bb Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 13 Aug 2018 08:54:09 +0300 Subject: [PATCH 18/18] SPARK-22713__ExternalAppendOnlyMap_effective_spill: address styling issues commented by @cloud-fan . --- .../util/collection/ExternalAppendOnlyMap.scala | 14 ++++---------- .../collection/ExternalAppendOnlyMapSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 23a0d751280f4..19ff109b673e1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -117,10 +117,7 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - /** - * Exposed for testing - */ - @volatile private[collection] var readingIterator: SpillableIterator = null + @volatile private var readingIterator: SpillableIterator = null /** * Number of files this map has spilled so far. @@ -573,10 +570,7 @@ class ExternalAppendOnlyMap[K, V, C]( context.addTaskCompletionListener[Unit](context => cleanup()) } - /** - * Exposed for testing - */ - private[collection] class SpillableIterator(var upstream: Iterator[(K, C)]) + private class SpillableIterator(var upstream: Iterator[(K, C)]) extends Iterator[(K, C)] { private val SPILL_LOCK = new Object() @@ -599,13 +593,13 @@ class ExternalAppendOnlyMap[K, V, C]( } } - private def destroy() : Unit = { + private def destroy(): Unit = { freeCurrentMap() upstream = Iterator.empty } def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { - CompletionIterator[(K, C), SpillableIterator](this, this.destroy ) + CompletionIterator[(K, C), SpillableIterator](this, this.destroy) } def readNext(): (K, C) = SPILL_LOCK.synchronized { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 09ef3cdc6f657..d542ba0b6640d 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -459,7 +459,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala // (lines 69-89) // assert(map.currentMap == null) - eventually{ + eventually { System.gc() // direct asserts introduced some macro generated code that held a reference to the map val tmpIsNull = null == underlyingMapRef.get.orNull @@ -518,7 +518,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite // (lines 69-89) assert(map.currentMap == null) - eventually{ + eventually { Thread.sleep(500) System.gc() // direct asserts introduced some macro generated code that held a reference to the map