Skip to content

[SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during iteration#21369

Closed
eyalfa wants to merge 22 commits intoapache:masterfrom
eyalfa:SPARK-22713__ExternalAppendOnlyMap_effective_spill
Closed

[SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during iteration#21369
eyalfa wants to merge 22 commits intoapache:masterfrom
eyalfa:SPARK-22713__ExternalAppendOnlyMap_effective_spill

Conversation

@eyalfa
Copy link
Copy Markdown

@eyalfa eyalfa commented May 19, 2018

What changes were proposed in this pull request?

This PR solves SPARK-22713 which describes a memory leak that occurs when and ExternalAppendOnlyMap is spilled during iteration (opposed to insertion).

(Please fill in changes proposed in this fix)
ExternalAppendOnlyMap's iterator supports spilling but it kept a reference to the internal map (via an internal iterator) after spilling, it seems that the original code was actually supposed to 'get rid' of this reference on the next iteration but according to the elaborate investigation described in the JIRA this didn't happen.
the fix was simply replacing the internal iterator immediately after spilling.

How was this patch tested?

I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this test asserts that neither the external map itself nor its iterator hold any reference to the internal map after a spill.
These approach required some access relaxation of some members variables and nested classes of ExternalAppendOnlyMap, this members are now package provate and annotated with @VisibleForTesting.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 19, 2018

Test build #90829 has finished for PR 21369 at commit 82591e6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented May 19, 2018

@lianhuiwang, @davies, @hvanhovell can you please have a look?

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 19, 2018

Test build #90830 has finished for PR 21369 at commit 72f6386.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 19, 2018

Test build #90831 has finished for PR 21369 at commit 536a769.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented May 19, 2018

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 19, 2018

Test build #90832 has finished for PR 21369 at commit 536a769.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val nextUpstream = spillMemoryIteratorToDisk(upstream)
assert(!upstream.hasNext)
hasSpilled = true
upstream = nextUpstream
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the change means we should reassign upstream (which eliminates reference to currentMap) after spill immediately, otherwise, we may hit OOM (e.g. never readNext() after spill - is this the real cause for JIRA issue?) ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically yes, according to my understanding of the code this should have happened on the subsequent hasNext/next call. However according to the analysis in the jira the iterator kept holding this reference, my guess: at this point the entire program started suffering lengthy GC pauses that got it into behaving as if under a deadlock,effectively leaving the ref in place (just a guess)

Copy link
Copy Markdown
Contributor

@JerryLead JerryLead May 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this issue. I think the potential solution is to change the upstream reference, but I have not tested if this change is sufficient and safe.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JerryLead, I'd appreciate if you could test this.
One more thing that bugs me is that there's another case when the iterator no longer needs the upstream iterator/underlying map but still holds a reference to it:
When the iterator reach EOF its hasNext method returns false which causes the wrapping CompletionIterator to call the cleanup function which simply nulls the underlying map member variable in ExternalAppendOnlyMap, the iterator member is not nulled out so we end up with the CompletionIterator holding two paths to the upstrean iterator which leads to the underlying map: first it still holds a reference to the iterator itself, however it still holds a reference to the cleanup closure which refers the ExternalAppendOnlyMap which still refers to the current iterator which refers upstream...
This can be solven in one of two ways:
Simple way, when creating the completion iterator, provide a closure referring the iterator, not the ExternalAppendOnlyMap.
Thorough way, modify completion iterator to null out references after cleaning up.

Having that said, I'm not sure how long a completed iterator may be 'sitting' before being discarded so I'm not sure if this is worth fixing, especially using the thorough approach.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy re-assigns upstream, once destroy is called, we should be fine?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (currentMap I think), so unless we've missed another ref we should be fine.

as I wrote above, I think there's a potentially more fundamental issue with CompletionIterator which keeps holding references via it's sub and completionFunction members , these might stall some objects from being collected and can be eliminated upon exhaustion of the iterator. there might be some more 'candidates' like LazyIterator and InterruptibleIterator, I think this desrves some more investigation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , do you think this is worth doing, I'm referring to the CompletionIterator delaying GC of the sub iterator and cleanup function (usually a closure referring to a larger collection).
if so, I'd open a separate JIRA+PR for this.

@Ngone51
Copy link
Copy Markdown
Member

Ngone51 commented May 19, 2018

cc @JerryLead

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented May 20, 2018

well, I took the time trying to figure out how's the iterator is eventually being used,
(most of) it boils down to org.apache.spark.scheduler.ShuffleMapTask#runTask which does:
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
looking at org.apache.spark.shuffle.ShuffleWriter#write implementations, it seems all of them first exhaust the iterator and then perform some kind of postprocessing: i.e. merging spills, sorting, writing partitions files and then concatanating them into a single file... bottom line the Iterator may actually be 'sitting' for some time after reaching EOF.
I'll implement the 'simple approach' for this PR, but I think this deserves a separate JIRA issue + PR.

Eyal Farago added 2 commits May 20, 2018 11:51
…showing that an exhausted iterator still reffers the underlying map.
…SpillableIterator.toCompletionIterator and use that instead of 'maually' creating the completion iterator. also introduced SpillableIterator.destoy which removes the reference to the upstream iterator and calls freeCurrentMap().
@SparkQA
Copy link
Copy Markdown

SparkQA commented May 20, 2018

Test build #90854 has finished for PR 21369 at commit d5ee172.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented May 20, 2018

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 20, 2018

Test build #90861 has finished for PR 21369 at commit d5ee172.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Copy Markdown
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change generally looks good to me except some style issues.
And we should make sure this change indeed fixes the memory leak, so appreciated if @JerryLead could verify it.

private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val sortedMap = destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines can be merged into one line?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately no, scala-style enforces a max of 100 chars per line

}
}

def destroy() : Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be private as freeCurrentMap

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, fixing

upstream = Iterator.empty
}

def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer private for this method

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* A comparator which sorts arbitrary keys based on their hash codes.
* A comparator which sorts arbitrary keys bas on their hash codes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?
bas -> based.

import org.apache.spark.util.CompletionIterator

class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the space. LocalSparkContext {

@JerryLead
Copy link
Copy Markdown
Contributor

Thank you all for fixing this issue. I'm sorry that I'm now writing a research paper about Spark GC and going to submit this paper next month. Since there is a lot of paper work to do, the verification may be performed next month.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 22, 2018

Test build #90943 has finished for PR 21369 at commit 1d1ddce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented May 22, 2018

@advancedxy , using jvisualvm+heap dump I could see that the second introduced test case ("drop all references to the underlying map once the iterator is exhausted") eliminated all references to the underlying map:
heap contained one instance of SizeTrackingAppendOnlyMap, but all references to it where unreachable, hence it was due to be evicted.
found one instance of CompletionIterator (actually anonymous class deriving it) which had references to the SpillableIterator, direct ref as member 'sub' and another one via member completionFunction$1.
the SpillableIterator had a single ref to the ExternalAppendOnlyMap which already had its currentMap field already nullified.

def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
readingIterator = new SpillableIterator(inMemoryIterator)
readingIterator
readingIterator.toCompletionIterator
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change the original behavior of destructiveIterator . I'd prefer do like this:

CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), readingIterator.destroy)

which keep compatibility with current code, and do not introduce unnecessary function.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What behavior does it change? Your suggested codes does exactly the same but is less streamlined and relies on an intermediate value (fortunately it's already a member variable)

Copy link
Copy Markdown
Member

@Ngone51 Ngone51 May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destructiveIterator should just return a destructive iterator (especially for map buffer) as it's function name implies, and it is none business of CompletionIterator . And developers should be free to define the complete function for the returned destructive iterator, in case of we want a different one somewhere else in future.

Your suggested codes does exactly the same but is less streamlined

I don't think this little change will pay a huge influence on streamlined .

and relies on an intermediate value (fortunately it's already a member variable)

The current fix leads to this, not me. And even this variable is not a member variable, we can define a temp local variable. It's not a big deal.


private def destroy() : Unit = {
freeCurrentMap()
upstream = Iterator.empty
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why empy, not null?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safer, class remains usable if for some reason hasNext is called again, and this costs absolutely nothing.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

sc.stop()
}

test("spill during iteration") {
Copy link
Copy Markdown
Member

@Ngone51 Ngone51 May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what this test want to do. But it seems code without this PR could also pass it if everything goes normally. And I know it's a little hard to reflect the change by unit test. So, I'd prefer to leave some comments to explain the potential memory leak in source code above.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was written BEFORE the actual fix and it did fail up untill the fix was in place. I do agree it's a bit clumsy and potential future changes may break the original intention of the test. I've referred a potential testing approach (currently limited to scala's source code) which couldn't be (easily) applied to this code base so I made a best effort to test this.
I agree this needs better documentation, I'll start be referring the issue in the test's name and will also add comments to the code.

@advancedxy
Copy link
Copy Markdown
Contributor

@advancedxy , using jvisualvm+heap dump I could see that the second introduced test case ("drop all references to the underlying map once the iterator is exhausted") eliminated all references to the underlying map:
heap contained one instance of SizeTrackingAppendOnlyMap, but all references to it where unreachable, hence it was due to be evicted.
found one instance of CompletionIterator (actually anonymous class deriving it) which had references to the SpillableIterator, direct ref as member 'sub' and another one via member completionFunction$1.
the SpillableIterator had a single ref to the ExternalAppendOnlyMap which already had its currentMap field already nullified.

Thanks. cc @cloud-fan for final review

assert(keys == (0 until 100))
}

test("drop all references to the underlying map once the iterator is exhausted") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also put the jira number in the test name.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

upstream = Iterator.empty
}

private[ExternalAppendOnlyMap]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's pretty reasonable to have this method public.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... the class itself is private (slightly relaxed to package private to ease testing) so I'm not sure what's the benefit in making the method public,
in any case I think that once we see the use case for making this method public we'd probably has to further change the iterator/external map classes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to see a class private method. I'd suggest just remove private[ExternalAppendOnlyMap]. spill is only called in ExternalAppendOnlyMap and it's public.

val nextUpstream = spillMemoryIteratorToDisk(upstream)
assert(!upstream.hasNext)
hasSpilled = true
upstream = nextUpstream
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy re-assigns upstream, once destroy is called, we should be fine?

assert(map.numSpills == 0, "map was not supposed to spill")

val it = map.iterator
assert( it.isInstanceOf[CompletionIterator[_, _]])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space after assert(

assert(map.currentMap == null)
assert(underlyingIt.upstream ne underlyingMapIterator)
assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to prove we are no longer holding the reference, why do we check type here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the underlying map's iterator is an anonymous class, this is the best I could come up with to check if the upstream iterator holds a ref to the underlying map.
@cloud-fan , do you have a better idea (I'm not 100% happy with this one)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply check assert(underlyingIt.upstream eq Iterator.empty)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, we can in line 508 but not in this test.
in this test we look at the iterator immediately after a spill, at this point upstream is supposed to be replaced by a DiskMapIterator, I guess we can check for this directly (after relaxing its visibility to package private).

in line 508, we can simply compare with Iterator.empty

@cloud-fan
Copy link
Copy Markdown
Contributor

cc @JoshRosen

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 24, 2018

Test build #91065 has finished for PR 21369 at commit e3c61fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Eyal Farago added 3 commits May 27, 2018 14:17
@hvanhovell
Copy link
Copy Markdown
Contributor

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 10, 2018

Test build #94559 has finished for PR 21369 at commit 621bd23.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Eyal Farago added 5 commits August 11, 2018 21:36
…e wek ref to assert the map is no longer reachable.
…x tests to effectively test for non-reachabillity of the internal map.
…o SPARK-22713__ExternalAppendOnlyMap_effective_spill__weak_ref_test
…k_ref_test' into SPARK-22713__ExternalAppendOnlyMap_effective_spill
@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented Aug 11, 2018

retest this please

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented Aug 11, 2018

@hvanhovell ,thanks for picking this up 😎

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 11, 2018

Test build #94626 has finished for PR 21369 at commit 1bfca67.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 12, 2018

Test build #94627 has finished for PR 21369 at commit 1bfca67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 12, 2018

Test build #94629 has finished for PR 21369 at commit 25be99b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Exposed for testing
*/
@volatile private[collection] var readingIterator: SpillableIterator = null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exposed in the test.

/**
* Exposed for testing
*/
private[collection] class SpillableIterator(var upstream: Iterator[(K, C)])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
// (lines 69-89)
// assert(map.currentMap == null)
eventually{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a space eventually {

}
}

private def destroy() : Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space before :

}

def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = {
CompletionIterator[(K, C), SpillableIterator](this, this.destroy )
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space before )

// (lines 69-89)
assert(map.currentMap == null)

eventually{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Copy Markdown
Contributor

LGTM except some code style issues. Thanks for improving the test!

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 13, 2018

Test build #94670 has finished for PR 21369 at commit 855854a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@eyalfa
Copy link
Copy Markdown
Author

eyalfa commented Aug 13, 2018

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 13, 2018

Test build #94678 has finished for PR 21369 at commit 855854a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Copy Markdown
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 2e3abdf Aug 13, 2018
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…teration

This PR solves [SPARK-22713](https://issues.apache.org/jira/browse/SPARK-22713) which describes a memory leak that occurs when and ExternalAppendOnlyMap is spilled during iteration (opposed to  insertion).

(Please fill in changes proposed in this fix)
ExternalAppendOnlyMap's iterator supports spilling but it kept a reference to the internal map (via an internal iterator) after spilling, it seems that the original code was actually supposed to 'get rid' of this reference on the next iteration but according to the elaborate investigation described in the JIRA this didn't happen.
the fix was simply replacing the internal iterator immediately after spilling.

I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this test asserts that neither the external map itself nor its iterator hold any reference to the internal map after a spill.
These approach required some access relaxation of some members variables and nested classes of ExternalAppendOnlyMap, this members are now package provate and annotated with VisibleForTesting.

Closes apache#21369 from eyalfa/SPARK-22713__ExternalAppendOnlyMap_effective_spill.

Authored-by: Eyal Farago <eyal@nrgene.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

(cherry-picked from commit 2e3abdf)

Ref: LIHADOOP-40170

RB=1413724
BUG=LIHADOOP-40170
G=superfriends-reviewers
R=fli,mshen,yezhou,edlu
A=yezhou
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants