Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jul 19, 2017

What changes were proposed in this pull request?

UnsafeExternalSorter.recordComparator can be either KVComparator or RowComparator, and both of them will keep the reference to the input rows they compared last time.

After sorting, we return the sorted iterator to upstream operators. However, the upstream operators may take a while to consume up the sorted iterator, and UnsafeExternalSorter is registered to TaskContext at here, which means we will keep the UnsafeExternalSorter instance and keep the last compared input rows in memory until the sorted iterator is consumed up.

Things get worse if we sort within partitions of a dataset and coalesce all partitions into one, as we will keep a lot of input rows in memory and the time to consume up all the sorted iterators is long.

This PR takes over #18543 , the idea is that, we do not keep the record comparator instance in UnsafeExternalSorter, but a generator of record comparator.

close #18543

How was this patch tested?

N/A

@cloud-fan
Copy link
Contributor Author

This bug was originally reported by @j-baker

also cc @kiszk and @srowen to review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SortComparator does not extend RecordComparator, @Override may not be necessary.

@kiszk
Copy link
Member

kiszk commented Jul 19, 2017

LGTM

@j-baker
Copy link

j-baker commented Jul 19, 2017

I'm not quite convinced this works in the way we want if spilling occurs.

@j-baker
Copy link

j-baker commented Jul 19, 2017

I would propose copying the comparator before giving it to anything that might use this, so that we never hold onto a comparator that someone might have left dirty.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79746 has finished for PR 18679 at commit f5752d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

Since this PR definitely fixes the non-spilling case, I'm going to merge it and @j-baker can continue his work to fix the spilling case(if it's problematic).

@j-baker
Copy link

j-baker commented Jul 19, 2017

But the spilling case is the issue?

@j-baker
Copy link

j-baker commented Jul 19, 2017

I think that #18543 (post rewrite) is a better approach than this - instead of retaining a memory leak, we remove the opportunity for one to appear.

@cloud-fan
Copy link
Contributor Author

I'm not convinced spilling case is the issue, can you verify that after this PR, your workload still OOM?

@j-baker
Copy link

j-baker commented Jul 19, 2017

So in my build, I definitely have spilling (quite a lot of it). If I have spilling, then the last thing that uses the comparator will definitely be the merge sorter. The last thing that uses the comparator is the thing that causes the memory leak.

@j-baker
Copy link

j-baker commented Jul 19, 2017

Oh, or are you saying that because in the external case most of the buffers stay on disk, you wouldn't see this issue because there's not enough stuff in memory? That'd make sense.

This seems fine to merge - not sure if you prefer the approach in my PR (make sure the comparator exists only in ephemeral objects).

@cloud-fan
Copy link
Contributor Author

I left a comment in #18543 (comment)

your concern is valid, in the case of coalesce, we may have many mergers in one task, and some mergers may finish earlier and waiting for the end of the task to do cleanup.

There are 3 possible fixes:

  1. do cleanup at the end of the in-memory sorter and the merger. Like this PR did
  2. copy the comparator when passing to the in-memory sorter and the merger. Like [SPARK-21319][SQL] Fix memory leak in UnsafeExternalRowSorter.RowComparator #18543 did
  3. avoid referring UnsafeExternalSorter in TaskContext.

@cloud-fan
Copy link
Contributor Author

cc @hvanhovell

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Jul 19, 2017

Personally I prefer option 3 if it's doable.

3 is not doable, this PR goes with option 2.

@cloud-fan
Copy link
Contributor Author

cc @ueshin

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79971 has finished for PR 18679 at commit 9b88297.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for 1 comment.

this, taskMemoryManager, recordComparator, prefixComparator, initialSize, canUseRadixSort);
this,
taskMemoryManager,
recordComparatorSupplier.get(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we need to null check for recordComparatorSupplier.

@SparkQA
Copy link

SparkQA commented Jul 27, 2017

Test build #79994 has finished for PR 18679 at commit 4903670.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@hvanhovell
Copy link
Contributor

LGTM - pending jenkins

@SparkQA
Copy link

SparkQA commented Jul 27, 2017

Test build #80000 has finished for PR 18679 at commit 4903670.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants