Skip to content

Conversation

@zhzhan
Copy link
Contributor

@zhzhan zhzhan commented Jul 20, 2017

What changes were proposed in this pull request?

Currently the memory leak error message is degraded to warning. But it does happens and impact perf of running jobs. This diff fix the memory leak caused in SortMergeJoin.

The diff is trying to exhaust the iterator, even it is not required, in order to make sure the iterator is destructed.

How was this patch tested?

Relies on existing unit test. Test in production job, and the memory leak is fixed by the diff.

@SparkQA
Copy link

SparkQA commented Jul 20, 2017

Test build #79807 has started for PR 18694 at commit b8acae2.


def destruct(): Unit = {
while (leftIter.advanceNext()) {}
while(rightIter.advanceNext()) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit add a space between while and (

@gatorsmile
Copy link
Member

retest this please

@gatorsmile
Copy link
Member

What is the perf impact and how large is it?


def destruct(): Unit = {
while (streamedIter.advanceNext()) {}
while (bufferedIter.advanceNext()) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain more where is the memory leak? Why we have to exhaust the iterators?

Copy link
Member

@viirya viirya Jul 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we don't advance to next rows, the iterators should not load the rows?** If you exhaust the iterators, it actually spend unnecessary time to pull and process those rows.

** Sort will consume the input iterator first to sort.

Copy link
Contributor Author

@zhzhan zhzhan Jul 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does introduce extra overhead. The other way is to introduce a new interface for RowIterator to destruct itself, which may be more elegant and need more change to core data structure. Memory leak is worse than extra overhead, because it causes more spill and other issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detail is explained below.

@viirya
Copy link
Member

viirya commented Jul 21, 2017

Can you show some experiments that indicate there's memory leak?

@SparkQA
Copy link

SparkQA commented Jul 21, 2017

Test build #79813 has finished for PR 18694 at commit b8acae2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhzhan
Copy link
Contributor Author

zhzhan commented Jul 21, 2017

The memory leak happens on following scenario. For example, in inner join, the left side is exhausted, we will stop advance the right side. Because the right side is not reach the end, the memory hold will not be released, cannot be used by any other operator, for example, UnsafeShuffleWriter, causing more spills.

When the iterator is exhausted, the following line will release all the memory.
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java#L169

Although the readingIterator is constructed here, and it can spill. But it will keep the current used page in memory, until the caller again invoke loadNext. Otherwise, the current page may be used by others at the time (Note that the last record in loadNext will be cloned in order to release the current page (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java#L167)).
The following comments actually explain why it is not spilled for the current page. We observed 32M not released.
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L519

@viirya
Copy link
Member

viirya commented Jul 21, 2017

I'd doubt this is actually a memory leak as UnsafeExternalSorter already avoids memory leaks by registering a cleanup task: https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L159

Btw, as the comment indicates, you still can't avoid holding the memory for the cases like the operator followed by limit.

@zhzhan
Copy link
Contributor Author

zhzhan commented Jul 21, 2017

Thanks for the comments. Memory leak here means the data structure hold memory unnecessarily and other operator cannot use it. cleanup hook is used after task is done. The diff solve the leak for SortMergeJoin only and does not apply to the limit case. Limit is another special case and need to be taken care of separately. Actually this leak happens a lot more often than limit case, and is a serious issue.

@viirya
Copy link
Member

viirya commented Jul 21, 2017

This only makes sense if the downstream operators consume all the iterator of SortMergeJoin first and then performs its work. If the downstream operators are piped with SortMergeJoin, once the iterator from SortMergeJoin is exhausted, even one side in the Join is not exhausted, the whole pipe is finished and the cleanup will be run, right?

Compared with the overhead this change brings in, how much it improves the performance?

@zhzhan
Copy link
Contributor Author

zhzhan commented Jul 21, 2017

If it is assumed that the pipeline is as simple as one stage only has one operator need to spill, you are right. But if the pipeline is more complex, for example multiple operator needs to spill, this leak can cause serious issue.

@SparkQA
Copy link

SparkQA commented Jul 21, 2017

Test build #79818 has finished for PR 18694 at commit 2703c1f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 21, 2017

Do you observe significant performance improvement with this change?

@zhzhan
Copy link
Contributor Author

zhzhan commented Jul 21, 2017

Currently the patch helps the scenario such as Join(A, Join(B,C)). It is critical for us because we have some internal usage in which each stage may consists of tens of sort operators. We found each operators takes the memory without releasing the current page, and causes a lot of spills. Such memory leak becomes critical (ShuffledHashJoin has similar issues and we did not hit issues caused by Limit).

To me, the leak itself is a bug. If it is agreed that we should fix this type of leak, we can find a more elegant way, such as new close() interface, to avoid the overhead.

@kiszk
Copy link
Member

kiszk commented Jul 24, 2017

Since To register cleanup to a cleanup task may not work as discussed at #18543, I am a little bit supportive to explicitly free memory as possible.
On the other hand, I think that the current destruct implementation looks time-consuming. Are there any other approaches to quickly free memory and to have less places to insert destruct?

@cloud-fan
Copy link
Contributor

IMO the cleanup hook is a workaround for the limitation of iterator model: although the parent knows when to release resource of its child, but there is no way to notify the child via iterator.

Maybe we can add a close method to the iterator used in Spark SQL?

@zhzhan
Copy link
Contributor Author

zhzhan commented Jul 27, 2017

Close the PR and will work on adding close interface for the iterator used in SparkSQL to remove extra overhead.

@zhzhan zhzhan closed this Jul 27, 2017
@taosaildrone
Copy link

@zhzhan @cloud-fan , is there a jira or PR for the iterator close?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants