Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1c4a6af
SPARK-22713__ExternalAppendOnlyMap_effective_spill: add failing test.
May 18, 2018
82591e6
SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix the issue by …
May 18, 2018
72f6386
SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix scalastyle is…
May 19, 2018
48224d9
SPARK-22713__ExternalAppendOnlyMap_effective_spill: some more styling…
May 19, 2018
536a769
SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix typo
May 19, 2018
589b423
SPARK-22713__ExternalAppendOnlyMap_effective_spill: introduce a test …
May 20, 2018
d5ee172
SPARK-22713__ExternalAppendOnlyMap_effective_spill: introduce method …
May 20, 2018
1d1ddce
SPARK-22713__ExternalAppendOnlyMap_effective_spill: address comments …
May 22, 2018
e3c61fd
SPARK-22713__ExternalAppendOnlyMap_effective_spill: address comments …
May 23, 2018
bc7dc11
SPARK-22713__ExternalAppendOnlyMap_effective_spill: address some more…
May 24, 2018
807032d
SPARK-22713__ExternalAppendOnlyMap_effective_spill: few more comments…
May 24, 2018
621bd23
SPARK-22713__ExternalAppendOnlyMap_effective_spill: address yet anoth…
May 25, 2018
a2e78e2
SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: re…
May 27, 2018
4366eb4
Merge commit '589b423' into SPARK-22713__ExternalAppendOnlyMap_effect…
May 27, 2018
686b4d9
SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: fa…
Jun 16, 2018
4e44585
SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: us…
Aug 11, 2018
0cf8913
SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: fi…
Aug 11, 2018
9eb5600
Merge branch 'SPARK-22713__ExternalAppendOnlyMap_effective_spill' int…
Aug 11, 2018
11b5bb4
SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test: fi…
Aug 11, 2018
1bfca67
Merge branch 'SPARK-22713__ExternalAppendOnlyMap_effective_spill__wea…
Aug 11, 2018
25be99b
Merge branch 'master' into SPARK-22713__ExternalAppendOnlyMap_effecti…
Aug 11, 2018
855854a
SPARK-22713__ExternalAppendOnlyMap_effective_spill: address styling i…
Aug 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class ExternalAppendOnlyMap[K, V, C](
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
}

@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
/**
* Exposed for testing
*/
@volatile private[collection] var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager
Expand Down Expand Up @@ -114,7 +117,10 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

@volatile private var readingIterator: SpillableIterator = null
/**
* Exposed for testing
*/
@volatile private[collection] var readingIterator: SpillableIterator = null

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exposed in the test.


/**
* Number of files this map has spilled so far.
Expand Down Expand Up @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
readingIterator = new SpillableIterator(inMemoryIterator)
readingIterator
readingIterator.toCompletionIterator

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change the original behavior of destructiveIterator . I'd prefer do like this:

CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), readingIterator.destroy)

which keep compatibility with current code, and do not introduce unnecessary function.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What behavior does it change? Your suggested codes does exactly the same but is less streamlined and relies on an intermediate value (fortunately it's already a member variable)

@Ngone51 Ngone51 May 22, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destructiveIterator should just return a destructive iterator (especially for map buffer) as it's function name implies, and it is none business of CompletionIterator . And developers should be free to define the complete function for the returned destructive iterator, in case of we want a different one somewhere else in future.

Your suggested codes does exactly the same but is less streamlined

I don't think this little change will pay a huge influence on streamlined .

and relies on an intermediate value (fortunately it's already a member variable)

The current fix leads to this, not me. And even this variable is not a member variable, we can define a temp local variable. It's not a big deal.

}

/**
Expand All @@ -280,8 +286,7 @@ class ExternalAppendOnlyMap[K, V, C](
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), freeCurrentMap())
destructiveIterator(currentMap.iterator)
} else {
new ExternalIterator()
}
Expand All @@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C](

// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val sortedMap = destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines can be merged into one line?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately no, scala-style enforces a max of 100 chars per line

private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
Expand Down Expand Up @@ -568,13 +573,14 @@ class ExternalAppendOnlyMap[K, V, C](
context.addTaskCompletionListener(context => cleanup())
}

private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
/**
* Exposed for testing
*/
private[collection] class SpillableIterator(var upstream: Iterator[(K, C)])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

extends Iterator[(K, C)] {

private val SPILL_LOCK = new Object()

private var nextUpstream: Iterator[(K, C)] = null

private var cur: (K, C) = readNext()

private var hasSpilled: Boolean = false
Expand All @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
} else {
logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
nextUpstream = spillMemoryIteratorToDisk(upstream)
val nextUpstream = spillMemoryIteratorToDisk(upstream)
assert(!upstream.hasNext)
hasSpilled = true
upstream = nextUpstream

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the change means we should reassign upstream (which eliminates reference to currentMap) after spill immediately, otherwise, we may hit OOM (e.g. never readNext() after spill - is this the real cause for JIRA issue?) ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically yes, according to my understanding of the code this should have happened on the subsequent hasNext/next call. However according to the analysis in the jira the iterator kept holding this reference, my guess: at this point the entire program started suffering lengthy GC pauses that got it into behaving as if under a deadlock,effectively leaving the ref in place (just a guess)

@JerryLead JerryLead May 20, 2018

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this issue. I think the potential solution is to change the upstream reference, but I have not tested if this change is sufficient and safe.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JerryLead, I'd appreciate if you could test this.
One more thing that bugs me is that there's another case when the iterator no longer needs the upstream iterator/underlying map but still holds a reference to it:
When the iterator reach EOF its hasNext method returns false which causes the wrapping CompletionIterator to call the cleanup function which simply nulls the underlying map member variable in ExternalAppendOnlyMap, the iterator member is not nulled out so we end up with the CompletionIterator holding two paths to the upstrean iterator which leads to the underlying map: first it still holds a reference to the iterator itself, however it still holds a reference to the cleanup closure which refers the ExternalAppendOnlyMap which still refers to the current iterator which refers upstream...
This can be solven in one of two ways:
Simple way, when creating the completion iterator, provide a closure referring the iterator, not the ExternalAppendOnlyMap.
Thorough way, modify completion iterator to null out references after cleaning up.

Having that said, I'm not sure how long a completed iterator may be 'sitting' before being discarded so I'm not sure if this is worth fixing, especially using the thorough approach.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy re-assigns upstream, once destroy is called, we should be fine?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (currentMap I think), so unless we've missed another ref we should be fine.

as I wrote above, I think there's a potentially more fundamental issue with CompletionIterator which keeps holding references via it's sub and completionFunction members , these might stall some objects from being collected and can be eliminated upon exhaustion of the iterator. there might be some more 'candidates' like LazyIterator and InterruptibleIterator, I think this desrves some more investigation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , do you think this is worth doing, I'm referring to the CompletionIterator delaying GC of the sub iterator and cleanup function (usually a closure referring to a larger collection).
if so, I'd open a separate JIRA+PR for this.

true
}
}

private def destroy() : Unit = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space before :

freeCurrentMap()
upstream = Iterator.empty

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why empy, not null?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safer, class remains usable if for some reason hasNext is called again, and this costs absolutely nothing.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

private[ExternalAppendOnlyMap]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's pretty reasonable to have this method public.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... the class itself is private (slightly relaxed to package private to ease testing) so I'm not sure what's the benefit in making the method public,
in any case I think that once we see the use case for making this method public we'd probably has to further change the iterator/external map classes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to see a class private method. I'd suggest just remove private[ExternalAppendOnlyMap]. spill is only called in ExternalAppendOnlyMap and it's public.

def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer private for this method

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

CompletionIterator[(K, C), SpillableIterator](this, this.destroy )

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space before )

}

def readNext(): (K, C) = SPILL_LOCK.synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
}
if (upstream.hasNext) {
upstream.next()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.util.CompletionIterator

class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
import TestUtils.{assertNotSpilled, assertSpilled}
Expand Down Expand Up @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}

test("spill during iteration") {

@Ngone51 Ngone51 May 22, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what this test want to do. But it seems code without this PR could also pass it if everything goes normally. And I know it's a little hard to reflect the change by unit test. So, I'd prefer to leave some comments to explain the potential memory leak in source code above.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was written BEFORE the actual fix and it did fail up untill the fix was in place. I do agree it's a bit clumsy and potential future changes may break the original intention of the test. I've referred a potential testing approach (currently limited to scala's source code) which couldn't be (easily) applied to this code base so I made a best effort to test this.
I agree this needs better documentation, I'll start be referring the issue in the test's name and will also add comments to the code.

val size = 1000
val conf = createSparkConf(loadDefaults = true)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this

val mapRef = new WeakReference(map.currentMap)
... // trigger spilling
System.gc()
assert(mapRef.get == null)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this requires using something like scalatest's eventually, don't you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AccumulatorSuite has a similar pattern and it works well(search System.gc). Maybe we don't need eventually?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway eventually is availabe in Spark test, we can use it if needed.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , I've tried the System.gc approach with and without eventually, and for no avail.
seems like scalatest's asserts use some black magic macro based voodoo to generate a local method/val (when I got about 10 classes deep into my quest, I gave up)
basically what I'm seeing in visualvm is the expected ref via the WeakReference and another surprising one via UnaryMacroBool.left.

it seems this sneaky ref was generated by the following assertion: assert(map.currentMap == null), but it could have easily been generated elsewhere.

@cloud-fan , @gatorsmile , can you please confirm if and how can we import the scala code? otherwise, can you think of an alternative approach for testing this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, how do you suggest to progress with this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , can we move on with this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
assert(map.numSpills == 0, "map was not supposed to spill")

val it = map.iterator
assert( it.isInstanceOf[CompletionIterator[_, _]])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space after assert(

val underlyingIt = map.readingIterator
assert( underlyingIt != null )

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(underlyingIt != null), we should not put space around. can you fix all of them?

val underlyingMapIterator = underlyingIt.upstream
assert(underlyingMapIterator != null)
val underlyingMapIteratorClass = underlyingMapIterator.getClass
assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]])

val underlyingMap = map.currentMap
assert(underlyingMap != null)

val first50Keys = for ( _ <- 0 until 50) yield {
val (k, vs) = it.next
val sortedVs = vs.sorted
assert(sortedVs.seq == (0 until 10).map(10 * k + _))
k
}
assert( map.numSpills == 0 )
map.spill(Long.MaxValue, null)
// these asserts try to show that we're no longer holding references to the underlying map.
// it'd be nice to use something like
// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
// (lines 69-89)
assert(map.currentMap == null)
assert(underlyingIt.upstream ne underlyingMapIterator)
assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to prove we are no longer holding the reference, why do we check type here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the underlying map's iterator is an anonymous class, this is the best I could come up with to check if the upstream iterator holds a ref to the underlying map.
@cloud-fan , do you have a better idea (I'm not 100% happy with this one)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply check assert(underlyingIt.upstream eq Iterator.empty)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, we can in line 508 but not in this test.
in this test we look at the iterator immediately after a spill, at this point upstream is supposed to be replaced by a DiskMapIterator, I guess we can check for this directly (after relaxing its visibility to package private).

in line 508, we can simply compare with Iterator.empty


val next50Keys = for ( _ <- 0 until 50) yield {
val (k, vs) = it.next
val sortedVs = vs.sorted
assert(sortedVs.seq == (0 until 10).map(10 * k + _))
k
}
assert(!it.hasNext)
val keys = (first50Keys ++ next50Keys).sorted
assert(keys == (0 until 100))
}

test("drop all references to the underlying map once the iterator is exhausted") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also put the jira number in the test name.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val size = 1000
val conf = createSparkConf(loadDefaults = true)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]

map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
assert(map.numSpills == 0, "map was not supposed to spill")

val it = map.iterator
assert( it.isInstanceOf[CompletionIterator[_, _]])
val underlyingIt = map.readingIterator
assert( underlyingIt != null )
val underlyingMapIterator = underlyingIt.upstream
assert(underlyingMapIterator != null)
val underlyingMapIteratorClass = underlyingMapIterator.getClass
assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]])

val underlyingMap = map.currentMap
assert(underlyingMap != null)

val keys = it.map{
case (k, vs) =>
val sortedVs = vs.sorted
assert(sortedVs.seq == (0 until 10).map(10 * k + _))
k
}
.toList
.sorted

assert(it.isEmpty)
assert(keys == (0 until 100))

assert( map.numSpills == 0 )
// these asserts try to show that we're no longer holding references to the underlying map.
// it'd be nice to use something like
// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
// (lines 69-89)
assert(map.currentMap == null)
assert(underlyingIt.upstream ne underlyingMapIterator)
assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]])
}

test("external aggregation updates peak execution memory") {
val spillThreshold = 1000
val conf = createSparkConf(loadDefaults = false)
Expand Down