Skip to content

Improve memory for HashBuilderOperator unspill#16212

Merged
arhimondr merged 2 commits intoprestodb:masterfrom
viczhang861:hash_builder_spill
Jun 24, 2021
Merged

Improve memory for HashBuilderOperator unspill#16212
arhimondr merged 2 commits intoprestodb:masterfrom
viczhang861:hash_builder_spill

Conversation

@viczhang861
Copy link
Contributor

@viczhang861 viczhang861 commented Jun 4, 2021

What are improved:

  • Destroy unspilledInProgress when is it no longer needed to free memory used by unspilled data.
== NO RELEASE NOTE ==

@viczhang861 viczhang861 requested a review from a team June 4, 2021 20:34
@viczhang861 viczhang861 force-pushed the hash_builder_spill branch from 4487c91 to d63d1cf Compare June 4, 2021 21:22
Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to summarize how I understand this change.

This PR moves the unspilling process from a background thread to the main thread. This is needed to be able to compact page index iteratively to save memory. Please let me know if my understanding is incorrect.

LGTM % nits

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use log.debug or log.info (here and other places)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I was using stdout for debug purpose as it is easier to access in spark node.

Page compact is a different issue, I will open a new PR to address it separately. Simply, compact makes a copy of data and corrects a bug in calculation of retained size for deserialized page.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we need to get rid of this codepath all together. Loading pages in a background thread is generally a very bad idea, as the CPU used for pages decoding is not accounted. Once we remove this codepath we would be able to get rid of the ListenableFuture<List<Page>> getAllSpilledPages interface and thread pools used in Spiller internally. Do you think we can try to gradually enable this codepath in production and remove it eventually?

CC: @rschlussel @highker

@arhimondr arhimondr closed this Jun 10, 2021
@arhimondr arhimondr reopened this Jun 10, 2021
@viczhang861 viczhang861 force-pushed the hash_builder_spill branch 2 times, most recently from eaf85b4 to ac8648e Compare June 11, 2021 04:49
Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not mentioning it initially. Could you please add a TestPrestoSparkSpilledJoins test and enable the new code path there, so it remains tested?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why you reordered these. Previously they were in the order that you expect to encounter the different states

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouping is suggested by IntelliJ, maybe some lint rule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you undo this bit? I think it's easier to understand the different states when the options are listed in order of how they happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the downside of this change? why do we only do it for spillInsmallMemoryPool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually I am planning to remove INPUT_UNSPILLING and related code completely, but I prefer to do it gradually just in case there is regression. Another reason is to make testing and rollout easier, I can test and experiment with multiple changes without worrying about breaking release.

@viczhang861 viczhang861 requested a review from rschlussel June 14, 2021 15:59
@viczhang861
Copy link
Contributor Author

rebase, trying to pass facebook-integration

@rschlussel
Copy link
Contributor

presto-spark job timed out due to hanging join spill tests. can you look into it?
2021-06-15T13:05:19.448-0500 WARN TestHangMonitor com.facebook.presto.testng.services.LogTestDurationListener No test started or completed in 8.00m. Running tests:
com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries::testLimitWithJoin running for 8.03m
com.facebook.presto.spark.TestPrestoSparkJoinQueries running for 48.40m
com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries running for 48.39m
com.facebook.presto.spark.TestPrestoSparkJoinQueries::testLimitWithJoin running for 8.06m.

@viczhang861 viczhang861 changed the title Improve memory for HashBuilderOperator unspill [Test]Improve memory for HashBuilderOperator unspill Jun 16, 2021
@viczhang861
Copy link
Contributor Author

viczhang861 commented Jun 16, 2021

@arhimondr @rschlussel com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries::testLimitWithJoin is a flaky test (it passed before rebase but failed in recent runs). I was able to reproduce locally but could not figure out root cause after spending more than a half day, however, it is not related to this PR so I disabled it for now.

TestPrestoSparkJoinQueries takes 58 minutes to complete, have you seen such large wall time variance before?

2021-06-16T11:42:20.167-0500 WARN pool-2-thread-1 com.facebook.presto.testng.services.LogTestDurationListener Tests from com.facebook.presto.spark.TestPrestoSparkJoinQueries took 58.01m
2021-06-16T00:05:41.095-0500 WARN pool-2-thread-1 com.facebook.presto.testng.services.LogTestDurationListener Tests from com.facebook.presto.spark.TestPrestoSparkAbstractTestJoinQueries took 5.61m

@rschlussel
Copy link
Contributor

we saw spill join tests hang for other modules before #15975, but that pr fixed it 100% for those cases.

@viczhang861
Copy link
Contributor Author

@rschlussel It could be similar issue, tuning task.concurrency and hash partition count prevents the issue from happening but I need more time to figure out root cause.

@viczhang861
Copy link
Contributor Author

viczhang861 commented Jun 17, 2021

Lower task concurrency improves presto-spark-base test suites run time to 41 minutes, will rerun for a few more times to confirm

  • ci / test (:presto-spark-base) (pull_request) Successful in 41m 2021-06-16T22:54:59.323-0500
  • WARN pool-2-thread-1 com.facebook.presto.testng.services.LogTestDurationListener Tests from com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries took 31.05m

Rerun:

  • ci / test (:presto-spark-base) (pull_request) Successful in 43m
  • 2021-06-17T12:40:10.393-0500 WARN pool-2-thread-1 com.facebook.presto.testng.services.LogTestDurationListener Tests from com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries took 32.68m

Rerun 06-21-2021:

  • ci / test (:presto-spark-base) (pull_request) Successful in 33m

@viczhang861 viczhang861 changed the title [Test]Improve memory for HashBuilderOperator unspill Improve memory for HashBuilderOperator unspill Jun 17, 2021
@arhimondr
Copy link
Member

arhimondr commented Jun 18, 2021

Based on my understanding this PR addresses two problems:

  1. The memory retained by the HashBuilderOperator#unspillInProgress (https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/operator/HashBuilderOperator.java#L225) is never released and never accounted. After the probe side is matched with the unspilled partition the current unspilled lookup table is released, memory is returned and the next partition is unspilled and so on. However the memory for the released partition is still being retained by the HashBuilderOperator#unspillInProgress, so the spill is actually not effective (eventually it uses the same amount of memory as with no spilling at all) (upd: eventually the operator is being destroyed and the memory get's reclaimed, however the pages retained by the unspillInProgress feature may still cause unwanted memory spikes in between). The least intrusive fix for this specific issue would be something like:
  // Use Queue so that Pages already consumed by Index are not retained by us.
  Queue<Page> pages = new ArrayDeque<>(getDone(unspillInProgress.get()));
  // Do not retain the pages by the feature
  unspillInProgress = Optional.empty()
  1. The memory estimate after unspill is incorrect. The assumption made here is that the pages after unspill are always compact, as the getSpilledPagesInMemorySize calculation assumes so. Pages with a mix of VariableWidthBlock and other blocks may be deserialized into a non-compact form: Fix retained size after Page deserialization #16248. That results in an under-reservation of memory before unspill that sometimes leads to an OOM. While we are still discussing whether it is right to copy the VariableWidthBlock content (similarly to how we do this for other types of block) a quick non intrusive fix for this problem could be to simply compact pages when unspilling somewhere here.

However, in addition to fixing these 2 bugs, this PR also changes an important aspect of spilling design. For some reason Spilling was designed to do spills / unspills in the background when spilling / unspilling cannot be done iteratively (one or few pages at a time).

Unspill is only done synchronously when it can be done iteratively (through WorkProcessor):

I'm not exactly sure why the decision to spill in a background was made, but I guess it is related to the high level design principle of the Operator interface. By design Operator interfaces methods shouldn't be long running, thus the decision to spill in a background is to avoid a single long running call on the Operator interface where we would need to crunch through gigabytes of data. However the downside is that the CPU spent on unspill is not being accounted, as it happens in a background thread outside of the TaskExecutor tread pool and the Driver execution loop.

It's an interesting tradeoff, and it feels like a right, long term solution would be to move the spill/unspill process from the background thread back to the thread managed by the TaskExecutor. However to avoid any long term implications related to breaking the Operator interface design assumptions it feels like while doing that we should make the spill / unspill process iterative (process only a few pages at a time). But this might be a bigger refactor, and currently it doesn't feel like a biggest problem we have in spilling.

@rschlussel @highker Since you guys are the spilling experts we would love to hear your opinion. Do you think it makes sense to move the unspilling process in this single place from the background without making it iterative? Or do you think we should postpone this decision, leave it as is, fix the bugs, and come back to it later once we are ready to approach this problem more holistically?

@arhimondr
Copy link
Member

Fix for the hanging test: #16293

@rschlussel
Copy link
Contributor

That's a good point that I hadn't considered. I don't think it makes sense to move spill to the main process without making it iterative (certainly not without very rigorous testing). It might not be so bad for Presto-on-spark because you'll only affect the query that's doing the spilling, but in a multitenant environment breaking that assumption can negatively affect other queries

@viczhang861
Copy link
Contributor Author

viczhang861 commented Jun 21, 2021

  1. In unspilling step, operator cannot make any progress until unspilling is completely done. Having main thread blocked and waiting for unspilling done is same as letting main thread finish. The reason spilling was done in a different thread is the choice of implementation in SingleStreamSpiller.
  • Pros of spill in main thread: getting rid of unspillInProgress is clean and guarantees issue is completely fixed.
  • Pros of having unspillInProgress running in another thread: makes it possible to cancel the job with increased complexity

It is important to allow cancelling the long running thread thus it makes sense to keep using unspillInProgress

  1. Hanging test is a long existing issue in Investigate often stuck test TestDistributedSpilledQueries::testLimitWithJoin #13859, it is not caused by this PR.
    It also fails when I set configProperties.put("experimental.spill-in-small-memory-pool", "false");
    This session property makes it convenient and useful for debugging.

  2. Test timeout
    I found it improves test run time when I decrease task concurrency from 16 to 2 for join queries, it doesn't hide any failed/succeed test. TestSpilledAggregation also uses task concurrency of 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line looks like a trivial change, but it is not true. State INPUT_UNSPILLING requires it to be present. That's why it can only be destroyed after state changes to next state. In case there are other dependencies for unspillInProgress, the code path was controlled by this session property.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Operator access is always single threaded and the finishLookupSourceUnspilling invocation is atomic. Thus in theory an intermittent state inconsistency should never be observed.

Could you please elaborate more on why the finishLookupSourceUnspilling reference cannot be set to Optional.empty() right after the Queue<Page> pages = new ArrayDeque<>(unspilledPages);?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was being retained here without nulling it out? The comment earlier suggests we use a queue so that we don't retain the unspilled pages after they get added to the index. Were we still actually holding on to all of it because of the unspillInProgress future? did that cause jvm ooms in the small memory pool environment? It wasn't being accounted for in the query memory, so it wouldn't have caused query ooms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time trying to understand how does this codepath improve memory footprint for small memory pools.

From what I understand there are two differences:

  1. The new code path updates the memory reservation is one shot
  2. The new code path nullifies the unspillInProgress reference

While the second difference makes sense to me, could you please elaborate more on the first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The new code path updates the memory reservation is one shot
    This line can be moved before if condition, but if so, refactoring of old path is needed. I prefer not to refactor the old path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also confused about this change. It looks like the main difference between this and the else block is that we are adding all the pages to the index all at once, and no longer updating the memory after adding each page to to the index (with the queue it's no longer part of the retainedSizeOfUnspilledPages, but will be accounted for in index.getEstimatedSize()). However, this will not accurately reflect the memory used by the index in the meantime and I wonder if that could cause its own problems

If the problem is just that the unspilled pages are retained from the unspillInProgress future, why not clear it as soon as we add them to the queue. That way we can get a more accurate accounting from the index as we add pages to it and still get the benefits of having a page only in the queue or the index, but not both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel Because if unspillInProgress is not cleared, removing page from queue won't really release memory. As @arhimondr pointed out this function is atomic, it should be safe to clear it and update memory while removing page from queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might be saying the same thing here. I'm suggesting nulling out unspillInProgress as soon as we create the queue and leaving everything else as it was before this change. That way removing the page form the queue will release the memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Operator access is always single threaded and the finishLookupSourceUnspilling invocation is atomic. Thus in theory an intermittent state inconsistency should never be observed.

Could you please elaborate more on why the finishLookupSourceUnspilling reference cannot be set to Optional.empty() right after the Queue<Page> pages = new ArrayDeque<>(unspilledPages);?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spilling / unspilling operations with the current implementation are non cancellable. If the task is already scheduled on a threadpool this will be a no-op operation. It feels like if we want to improve cancellation we also need to make sure the background spilling tasks are cancellable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr can you explain why this isn't cancellable? Won't it get interrupted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel It will get an interrupt flag set, but the task is not checking it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that's too bad. In that case, I think it could be misleading to cancel the task since it would be a no-op. Would be better to do this in conjunction with adding proper cancellation support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel Removed cancellation code to handle that in separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about this test. If the goal is to disable it why is it needed to change the implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It narrows down which query caused failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think leaving a note in a comment section would suffice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think a comment would be more helpful. Otherwise it's likely that someone will just switch it to enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend calling super.testLimitWithJoin() or simply leaving the method body empty (maybe with only the comment explaining why the test is disabled) instead of copying the entire test implementation (similarly to what we do in other places: https://github.com/prestodb/presto/blob/master/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestQueries.java#L74, https://github.com/prestodb/presto/blob/master/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveDistributedQueries.java#L67)

@arhimondr
Copy link
Member

In unspilling step, operator cannot make any progress until unspilling is completely done. Having main thread blocked and waiting for unspilling done is same as letting main thread finish. The reason spilling was done in a different thread is the choice of implementation in SingleStreamSpiller

An operator that is blocked is removed from scheduling until it is unblocked. The threads from the main thread pool can be reused for running other operators, tasks or queries.

Hanging test is a long existing issue in #13859, it is not caused by this PR.

It looks like the issue described here is fixed in Trino and backported to prestodb by @rschlussel : #15975. This looks like a different issue (#16293).

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR now only addresses a single problem, the problem with the unspillInProgress feature over-retaining pages. The second problem related to pages being non compact after unspill is not addressed here. It should be fine to address it in a separate PR. Could you please update the commit messages accordingly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend calling super.testLimitWithJoin() or simply leaving the method body empty (maybe with only the comment explaining why the test is disabled) instead of copying the entire test implementation (similarly to what we do in other places: https://github.com/prestodb/presto/blob/master/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestQueries.java#L74, https://github.com/prestodb/presto/blob/master/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveDistributedQueries.java#L67)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this extra test is needed? There should be plenty of join without limit tests in the AbstractTestJoinQueries test suite

Copy link
Contributor Author

@viczhang861 viczhang861 Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • It was used for comparison with the same query when limit was added. This test was removed and described in comment.
  • The issue of "page not being compacted" will be in different PR so that @rschlussel didn't get confused as she didn't have context about the other issue.

 - Before the fix, all unspilled data are read into memory
   and hold on until HashBuilderOperator is destructed.
   Nullify unspilled pages allows memory to be freed.
 - Add Presto on Spark spill test for join queries.
 - Add Presto spill test for join queries.
The test hangs when spilling is enabled for
Presto on Spark.
@viczhang861 viczhang861 requested a review from arhimondr June 23, 2021 21:21
@viczhang861
Copy link
Contributor Author

TestMongoDistributedQueries fail , created #16326 to track.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants