-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use StreamWriter in bulk loader #3542
Conversation
… key sorted order changes due to version append in Badger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 8 files at r1, 20 of 21 files at r2.
Reviewable status: all files reviewed, 9 unresolved discussions (waiting on @mangalaman93 and @manishrjain)
dgraph/cmd/bulk/reduce.go, line 47 at r2 (raw file):
func (r *reducer) run() error { shardDirs := shardDirs(r.opt.TmpDir)
minor: having the variable have the same name as the function makes this a bit confusing. Maybe the method can be named something like getShardDirs
or the variable can be simply named dirs
.
dgraph/cmd/bulk/reduce.go, line 68 at r2 (raw file):
writer := db.NewStreamWriter() if err := writer.Prepare(); err != nil { panic(err)
why are you using panic here instead of x.Check like in some other places?
dgraph/cmd/bulk/reduce.go, line 213 at r2 (raw file):
keyChanged := !bytes.Equal(prevKey, me.Key) if keyChanged && plistLen > 0 {
I am assuming the keys are returned in order so when the key changes we are done with the count for this key. Is that right?
If so, maybe adding a small comment explaining this invariant would be helpful for future readers of the code.
Bug fix for bulk loader changes introduced in #3542. Fixes #3607. Signed-off-by: பாலாஜி ஜின்னா <[email protected]>
This PR switched transaction based writes to Badger to StreamWriter and brings in Badger master into vendor. This PR also refactors bulk loader code as follows: - Simplify shuffler and reducer code and merge them into one, i.e. reducer. - Remove shuffler.go file. - Remove metrics.go file. - The channel based heap merge was expensive. Switched that with a simple map entries iterator. With these changes, the 21M dataset now takes 2 mins to load from the original 3 mins. Changes: * Simplified shuffler and reducer code. But, encountered an issue where key sorted order changes due to version append in Badger. * Working code after StreamWriter integration. * Vendor Badger in, because it contains fixes to StreamWriter. * Fix build breakages caused by importing Badger.
Bug fix for bulk loader changes introduced in hypermodeinc#3542. Fixes hypermodeinc#3607. Signed-off-by: பாலாஜி ஜின்னா <[email protected]>
This PR switched transaction based writes to Badger to StreamWriter.
This PR also refactors bulk loader code as follows:
With these changes, the 21M dataset now takes 2 mins to load from the original 3 mins.
This change is