Skip to content

Implement Spilling for TopNRowNumber Operator#18400

Merged
rschlussel merged 1 commit intoprestodb:masterfrom
shrinidhijoshi:SpillableTopnOperator-C
Jan 20, 2023
Merged

Implement Spilling for TopNRowNumber Operator#18400
rschlussel merged 1 commit intoprestodb:masterfrom
shrinidhijoshi:SpillableTopnOperator-C

Conversation

@shrinidhijoshi
Copy link
Collaborator

@shrinidhijoshi shrinidhijoshi commented Sep 25, 2022

Implement Spilling for TopNRowNumber Operator.

The general idea for spill is - When input does not fit in memory, we spill it sorted (by groupId) to disk. We keep doing this until all input is consumed. Then, when we want to generate output, we will merge sort all the spill files on disk and process a few groupIds at a time to produce the final output.

What does Spill mean in the case of TopNRowNumber Operator ?

TopN is calculated using a Heap per groupId (ie. if we are tracking Top 5, for K groups, then we maintain K heaps of max size 5 each). When we decide to spill, we extract all (K*5) rows across these K heaps ordered by the GroupId. And we serialize this onto disk.

What does Unspill mean ?

Unspill has 2 parts/stages (pipelined).
1st part - merge sorts the entries across all the spill files and creates new Pages at the group boundaries. This means that each page generated by the merge sort will contain ALL the data for some of groups. This helps us in the next stage.
2nd part - We create an inMemoryGroupedTopNBuilder and pass each page to this. The distinction here (v/s the inMemoryGroupedTopNBuilder we created during the input) is that as soon as we process the first page, we are sure that it contains all the the data for the set of groups encapsulated in it. So we can safely generate the output and flush the builder. Then move on to process the next page.
This is what enables us to process data that initially did not fit in memory.

When is the spill triggered ?

The Spill of data can be triggered in 2 cases

  1. When we are collecting input (addInput()) and we exceeded the revoke-memory-threshold. This triggers the revoke flow
  2. When getOutput() is called and the operator attempts to move the input collected so far from revokableMemory to userMemory to create output pages, and this fails because the collected input is too large to fit in UserMemory

A picture is worth a thousand words :)

arch

Test Plan

== RELEASE NOTES ==

General Changes
* ...
* ...

Hive Changes
* ...
* ...

@shrinidhijoshi shrinidhijoshi requested a review from a team as a code owner September 25, 2022 14:36
@shrinidhijoshi shrinidhijoshi marked this pull request as draft September 25, 2022 14:36
@shrinidhijoshi shrinidhijoshi changed the title Spillable topn operator c [TopNOperator-Spill] [Part3] Add SpillableGroupedTopNBuilder Sep 25, 2022
@shrinidhijoshi shrinidhijoshi changed the title [TopNOperator-Spill] [Part3] Add SpillableGroupedTopNBuilder [TopNOperator/TopNRowNumberOperator-Spill] Add SpillableGroupedTopNBuilder Sep 30, 2022
@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch 8 times, most recently from 68135be to 9abc21d Compare October 4, 2022 09:38
@shrinidhijoshi shrinidhijoshi changed the title [TopNOperator/TopNRowNumberOperator-Spill] Add SpillableGroupedTopNBuilder Implement Spilling for Topn/TopnRowNumber Operator Oct 4, 2022
@shrinidhijoshi shrinidhijoshi changed the title Implement Spilling for Topn/TopnRowNumber Operator Implement Spilling for TopNRowNumber Operator Oct 4, 2022
@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch 5 times, most recently from eb63df0 to f949637 Compare October 5, 2022 06:50
@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch from aa0a439 to 270c096 Compare October 13, 2022 03:27
@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch from 270c096 to 6a83a7f Compare October 26, 2022 22:09
@rschlussel
Copy link
Contributor

Thanks for the detailed algorithm description.

Question, and please correct me if I'm not understanding this right: Without spill we keep the topn elements for k groups in memory at a time, so the maximum memory it would use is k * n. With this spill approach, we process one group at a time, but generate a page for all of the values in each group, so the maximum memory will be . If some group is very large compared to k * n, you could actually end up using more memory with spill. Is that correct? Or is it the case that because it's pipelined, you don't actually keep the whole group in memory, you have a page of that group in memory, and then it goes to the heap and only have topn, and then process more pages from the group?

Was basically wondering

  1. Do we understand if any are the cases where we would expect this to fail too?
  2. Does it make sense to sort by both the grouping key and the ordering keys, so that when you merge the pages back you only need to keep the top n?

@shrinidhijoshi
Copy link
Collaborator Author

This is very good point.
To answer your questions.

Do we understand if any are the cases where we would expect this to fail too?

Yes. Your suspicion is right. So I can see a case, based on the current implementation, where this would still fail, because output of merge sort buffers full groups into pages and not let groups span across pages.

you could actually end up using more memory with spill. Is that correct?

Yes. because we now have worst case M (spill files) X N values for that group (instead of just N) to accomodate in memory. But note that, in this case, if we didn't use spill and tried to do it in memory, we would surely fail.

Does it make sense to sort by both the grouping key and the ordering keys, so that when you merge the pages back you only need to keep the top n?

Are you suggesting that after we sort on write, we can skip on read to reduce memory pressure ? IMO, a simpler approach is to switch from page pipelining to row pipelining and so the topNBuilder would do the sort and skip implicitly.

I am still trying to comprehend what would be the conditions under which this would theoretically happen (the input TopN heaps should eliminate a lot of values). I guess if N is big and the data is encountered in the input such a way that it maximizes the number of spill across which this big group would be spread ? I didn't find cases from the production I sample of 200 queries. Maybe I should look at a bigger sample..

@rschlussel
Copy link
Contributor

I think I missed that it was stored on disk as a heap, so so we never keep more than n values per-group per-file. so it sounds like without spill we run out of memory if there are a lot of groups (because we are maintaining so many heaps) with also n playing a role in how many heaps we can support.

With spill if you have a small or medium number of groups, but they are extremely large, would you end up with a lot of files for every group so that you could use more memory with spill than without?

It sounds like definitely for a large number of groups and a lot of data in each group, it would fail both with and without spill with OOM.

@shrinidhijoshi
Copy link
Collaborator Author

With spill if you have a small or medium number of groups, but they are extremely large, would you end up with a lot of files for every group so that you could use more memory with spill than without?

Yes that is correct. So this would be another case where the in-memory and spill version would both fail

@rschlussel
Copy link
Contributor

With spill if you have a small or medium number of groups, but they are extremely large, would you end up with a lot of files for every group so that you could use more memory with spill than without?

Yes that is correct. So this would be another case where the in-memory and spill version would both fail

A small number of large groups wouldn't necessarily fail without spill because the amount of memory doesn't grow with the size of the group. However, if spill gets triggered for some reason, it could fail with OOM because each group is large, it will use more memory.

@shrinidhijoshi
Copy link
Collaborator Author

shrinidhijoshi commented Oct 27, 2022

Ah yes. I was under the impression that up until you exceeded local memory limits, the in-memory and spill paths are effectively the same. But when I looked at the operator revoke logic and I see what you mean.

Essentially, when there is revokable memory pressure, regardless of which operator is causing it, all operators are spilled, until pressure is tolerable. This makes the spill and in-memory paths quite different, even if the operator is using very less memory. Interesting...

@shrinidhijoshi
Copy link
Collaborator Author

shrinidhijoshi commented Oct 27, 2022

I wonder if other Operators that use similar approach (example: HashAggregationOperator/SpillableHashAggregationBuilder) are also susceptible to the same behavior.

For example: If we are doing a distinct operation, then it would consume more memory if we kept spilling all the intermediate hash tables and then try to read them back. Because there too, the merge sort output tries to buffer until you read all the values for the group - https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/operator/MergeHashSort.java#L62

This could be a good opportunity to fix this in all such instances.

@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch from 274d443 to eb3a7f8 Compare December 22, 2022 21:10
@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch 4 times, most recently from 2df1bdb to a2db98d Compare January 3, 2023 23:19
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good aside from adding the missing requireNonNull checks

DriverYieldSignal driverYieldSignal,
SpillerFactory spillerFactory)
{
this.inputInMemoryGroupedTopNBuilderSupplier = inputInMemoryGroupedTopNBuilderSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add requireNonNull checks for all the non-primitive fields

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch from fa7c974 to 0ae39f0 Compare January 5, 2023 16:14
@rschlussel
Copy link
Contributor

If you squash the commits together i can merge it.

@shrinidhijoshi
Copy link
Collaborator Author

@rschlussel Thanks! I am running another round of regression tests. Will comment here when it's done.

@shrinidhijoshi
Copy link
Collaborator Author

@rschlussel I have found the cause of the memory leak discovered during final testing. I have also fixed it. But I still see about 10% of the cases OOMing. Working on fixing that.

@rschlussel
Copy link
Contributor

when you finish, i'd love to learn how you debugged it.

@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch 2 times, most recently from cde521a to 3d883bf Compare January 13, 2023 21:33
@shrinidhijoshi
Copy link
Collaborator Author

@rschlussel Fixed the issues and re-tested the following setups

Based on these tests results, I believe we should be good to final review and merge

@rschlussel
Copy link
Contributor

@shrinidhijoshi did you look at the column and row count mismatches from this test: https://www.internalfb.com/intern/presto/verifier/results/?test_id=101281. I want to be sure we're not introducing a correctness bug.

@shrinidhijoshi
Copy link
Collaborator Author

@rschlussel When I looked into the errors in other suites, it looked like its due to non-determinism, but the verifier isn't recognizing it as so. I will do the same validation for this (https://www.internalfb.com/intern/presto/verifier/results/?test_id=101281) suite as well.

I have submitted a new suite of these COLUMN_MISMATCH failing queries, test and control both being on stable to check if these are also non-deterministic and are being wrongly classified as deterministic. Will update once it's done.

@shrinidhijoshi
Copy link
Collaborator Author

shrinidhijoshi commented Jan 20, 2023

@rschlussel Looks like the column mismatch cases now succeed.
https://www.internalfb.com/intern/presto/verifier/results/?test_id=101567.
There is definitely some flakiness/unreliability in the verifier here, as I am not able to explain this behavior..
Either the test cases should consistently pass or consistently fail (with non-determinism as the cause)
I submitted 2 runs

  1. control - stable, test-stable - ALL SUCCEED
  2. control -stable, test-1d4e4be - ALL SUCCEED (https://our.internmc.facebook.com/intern/presto/verifier/results/?test_id=101567)

Any ideas ?

@rschlussel
Copy link
Contributor

looking back at the failures the determinism check failed, so it marked the tests failed since it couldn't tell whether the test was deterministic. I guess it was non-deterministic.

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain what was the root cause of the crashes you were seeing? Was it the missing updateMemoryReservation calls or some combination of things?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would the inMemoryGroupedTopNBuilder be not empty but revocable memory be > 0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for localRevocableMemoryContext tells us if we are inside the buildResult function and inside the migrateMemoryContext function, which will mean that the current input is either going to be returned or spilled, so this startRevokingThread doesn't need to take any action (specially - do not trigger another spill) of the same input

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to make sure I understand what's going on here - it looks like previously we would build the result and switch to user memory as long as inputInMemoryGroupedTopNBuilder fit in the user memory, and now we only do it if we haven't started spilling. Is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context to understand this change: The current design is that, if we have previously spilled input , then we will spill the last accumulated input regardless of wether it fits into userMem or not.

This code change is just trying to cleanly implement accounting for that design. i.e if we have previously spilled, then do not try to unnecessarily move to localMemory , because we are going to spill it anyway

Copy link
Contributor

@rschlussel rschlussel Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this wasn't causing the OOM issues, right because we would have failed? it was just an additional thing that needed to be fixed (though also related to memory management)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct.

@shrinidhijoshi
Copy link
Collaborator Author

shrinidhijoshi commented Jan 20, 2023

can you explain what was the root cause of the crashes you were seeing? Was it the missing updateMemoryReservation calls or some combination of things?

This was an interesting one. From what I understood, the way memory accounting works inside the Spiller is imperfect. For example, the spill call takes in an iterator to a list of pages to spill, but during the course of spilling, it is expected that the caller of spill account for the memory of those pages. So in this case, it is the Spillable-Builder's responsibility to ensure that the currently spilling pages are accounted for in revocable memory until spill finishes.
I had made a code change along the way where i moved the updateMemoryReservation call into the spill function itself to make the code more DRY. Along with the fact that, when we trigger spill, we let go on the current InMem-Builder and instantiate a new one, calling updateMemoryReservation would reset the revocable mem to this new builder size (empty/0).
So this led to a scenario where effectively no one is accounting for the pages "being spilled", thus underestimating the currently used JVM memory, and causing 137

@rschlussel
Copy link
Contributor

just squash the commits together and it'll be good to merge.

@shrinidhijoshi
Copy link
Collaborator Author

I further verified this by looking at other spilling Operators. Once a spill is started, the updateMemory is next called only in the finishMemoryRevoke function. This is to ensure that we the caller (builder/operator) is accounting/holding onto that memory.
Possibly we can simplify this if we change the interface to spill function. So that it not only takes ownership of the pages to spill but also their memory accounting. Then the caller wouldn't to do this complex coordination.

@shrinidhijoshi shrinidhijoshi force-pushed the SpillableTopnOperator-C branch from 3d883bf to 06e135c Compare January 20, 2023 18:16
@shrinidhijoshi
Copy link
Collaborator Author

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants