-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32350][FOLLOW-UP] Fix count update issue and partition the value list to a set of small batches for LevelDB writeAll #29425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #127415 has finished for PR 29425 at commit
|
| // Partition the value list to a set of the 128-values batches. It can reduce the | ||
| // memory pressure caused by serialization and give fairness to other writing threads | ||
| // when writing a very large list. | ||
| for (List<?> batchList : Iterables.partition(entry.getValue(), 128)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment in #28412 (review) . I think we should add the unit test cases for verifying the code work as our design. We might need to update the implementation to provide the private APIs for testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I will add unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @gatorsmile , unit tests for HybridStore are added in #29509. And the unit test for writeAll() method of levelDB is added in current pr.
| try (WriteBatch batch = db().createWriteBatch()) { | ||
| updateBatch(batch, valueIter.next(), serializedValueIter.next(), klass, | ||
| naturalIndex, indices); | ||
| db().write(batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Here I put db().write(batch) inside the while loop, that means we will call db().write() for every entity, in a way similar to removeAllByIndexValues(). I did this change because after I added the unit test, I found a bug that the count cannot be updated properly when we use the original code of writeAll(). The count is updated in https://github.com/apache/spark/blob/master/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java#L395. Since the delta is overridden to 1 every time we add a new entity, we can only increase the count by 1 for every db().write(batch) call, even though we write multiple entities in that call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information. Sounds like ideally we should fix the bug in LevelDBTypeInfo instead. Sorry I haven't have time to look into details. If possible we'd better fix there; if that's not feasible please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I would try to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @HeartSaVioR , I fixed the bug by using a hashmap to store and update the <countKey, delta> pair in a batch and use the information in that hashmap to update the count right before calling db.write(batch). I updated the test case to cover the count update issue. I also ran benchmark tests for writeAll and put the result in the description section of this PR.
|
Test build #127897 has finished for PR 29425 at commit
|
| naturalIndex, indices, counts); | ||
| } | ||
| for (Map.Entry<ByteBuffer, Long> countEntry : counts.entrySet()) { | ||
| naturalIndex.updateCount(batch, countEntry.getKey().array(), countEntry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Here I use naturalIndex.updateCount() to put the count information of all indexes to the batch. When implementing this I found we can lift the method updateCount() and long getCount(byte[] key) from LevelDBTypeInfo.Index to LevelDBTypeInfo, as these methods are not accessing any member of LevelDBTypeInfo.Index. Doing that would allow us to call ti.updateCount() to update count for all indexes, which would make more sense. However, it's totally optional.
|
Test build #128168 has finished for PR 29425 at commit
|
|
I'm not 100% sure I'm qualified to review this patch, as this touches the details of LevelDB KV store implementation. Let me cc to @mridulm and @vanzin again. I'll try to spend time to look into details of LevelDB once I get some time and try to review again if @mridulm and @vanzin aren't available for these days. |
|
Hi @HeartSaVioR , I would like to know if there are any updates on the review of this PR. Thanks! |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
I occasionally found this PR, it contains an underlying correctness fix of also cc @mridulm |
What changes were proposed in this pull request?
This is a follow-up of #29149. The change is that when writing a large value list, we divide the value list to a set of 128-values smaller batches, and bulk write these smaller batches one by one. This improvement can reduce memory pressure caused by serialization and give fairness to other writing threads. The idea is proposed by @mridulm in #29149 (comment).
Update 9/1: After adding the unit test, I found the count of objects cannot be updated properly when using writeAll. This PR also fix the issue by using a hash map to temporarily store and update the delta of each countKey, and put the countKey with its delta to WriteBatch before calling db.write(batch).
Why are the changes needed?
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and manual test.
A test case testWriteAll() is added in LevelDBSuite.java.
I compared some write-related benchmark tests in LevelDBBenchmark.java between writeAll() and write(). The test was conducted in an HDD disk, and the disk is not busy.