-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add partition key based iterator #4841
Conversation
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
…ures that we don't have too many pending requests.
…encoder. We need a way to parse key easily for comparisons though.
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
…aji/new_reduce Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
Signed-off-by: balaji <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks alright to me. Have some comments. Will defer to @animesh2049 to do the final review and LGTM.
Reviewed 1 of 5 files at r2, 1 of 3 files at r3, 7 of 8 files at r5.
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @balajijinnah)
dgraph/cmd/bulk/key.go, line 1 at r5 (raw file):
package bulk
Add a license, but use the license that protobuf generated code has (or protobuf has).
dgraph/cmd/bulk/mapper.go, line 129 at r5 (raw file):
PartitionKeys: [][]byte{}, } shardPartioionNo := len(entries) / partitionKeyShard
spelling mistake
dgraph/cmd/bulk/mapper.go, line 132 at r5 (raw file):
for i := range entries { if shardPartioionNo == 0 { // we have only less entries so no need for partition keys.
we have very few ...
dgraph/cmd/bulk/mapper.go, line 144 at r5 (raw file):
lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf))) _, err = w.Write(lenBuf)
x.Check2(...)
dgraph/cmd/bulk/mapper.go, line 146 at r5 (raw file):
_, err = w.Write(lenBuf) x.Check(err) _, err = w.Write(headerBuf)
x.Check2(...)
dgraph/cmd/bulk/reduce.go, line 89 at r5 (raw file):
}) // Start bactching for the given keys
spelling
dgraph/cmd/bulk/reduce.go, line 181 at r5 (raw file):
// readKey reads the next map entry key. readKey := func() {
readKey := func() error {
...
return io.EOF
}
Don't need eof variable.
Also, rename to readMapEntry
dgraph/cmd/bulk/reduce.go, line 185 at r5 (raw file):
return } if !prevKeyExist {
if prevKeyExist { return }
dgraph/cmd/bulk/reduce.go, line 202 at r5 (raw file):
x.Check2(io.ReadFull(r, eBuf)) key, err = GetKeyForMapEntry(eBuf)
return key, err
key, err = readMapEntry(...)
dgraph/cmd/bulk/reduce.go, line 217 at r5 (raw file):
ie.partitionKey = pKey for { readKey()
if err := readKey(); err == io.EOF { break }
dgraph/cmd/bulk/reduce.go, line 231 at r5 (raw file):
} mi.batchCh <- ie i++
Doesn't look like its used.
dgraph/cmd/bulk/reduce.go, line 237 at r5 (raw file):
batch := make([][]byte, 0, batchAlloc) for { readKey()
fix this up as well.
dgraph/cmd/bulk/reduce.go, line 257 at r5 (raw file):
} func (mi *mapIterator) Next() bool {
Next() *iteratorEntry
Remove current(). If nothing is there, then return nil.
dgraph/cmd/bulk/reduce.go, line 374 at r5 (raw file):
go r.encode(encoderCh, encoderCloser) } // Start lisenting to write the badger list.
spelling mistake.
dgraph/cmd/bulk/reduce.go, line 380 at r5 (raw file):
for i := 0; i < len(partitionKeys); i++ { batch := make([][]byte, 0, batchAlloc) for _, itr := range mapItrs {
Let's add the assert back.
Signed-off-by: balaji <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 6 of 9 files reviewed, 15 unresolved discussions (waiting on @manishrjain)
dgraph/cmd/bulk/key.go, line 1 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Add a license, but use the license that protobuf generated code has (or protobuf has).
Done.
dgraph/cmd/bulk/mapper.go, line 129 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
spelling mistake
Done.
dgraph/cmd/bulk/mapper.go, line 132 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
we have very few ...
Done.
dgraph/cmd/bulk/mapper.go, line 144 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
x.Check2(...)
Done.
dgraph/cmd/bulk/mapper.go, line 146 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
x.Check2(...)
Done.
dgraph/cmd/bulk/reduce.go, line 89 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
spelling
Done.
dgraph/cmd/bulk/reduce.go, line 181 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
readKey := func() error {
...
return io.EOF
}Don't need eof variable.
Also, rename to readMapEntry
Done.
dgraph/cmd/bulk/reduce.go, line 185 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
if prevKeyExist { return }
Done.
dgraph/cmd/bulk/reduce.go, line 202 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
return key, err
key, err = readMapEntry(...)
Done.
dgraph/cmd/bulk/reduce.go, line 217 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
if err := readKey(); err == io.EOF { break }
Done.
dgraph/cmd/bulk/reduce.go, line 231 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Doesn't look like its used.
Done.
dgraph/cmd/bulk/reduce.go, line 237 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
fix this up as well.
Done.
dgraph/cmd/bulk/reduce.go, line 257 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Next() *iteratorEntry
Remove current(). If nothing is there, then return nil.
Done.
dgraph/cmd/bulk/reduce.go, line 374 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
spelling mistake.
Done.
dgraph/cmd/bulk/reduce.go, line 380 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Let's add the assert back.
Done.
dgraph/cmd/bulk/reduce.go
Outdated
tmpBuf []byte | ||
fd *os.File | ||
reader *bufio.Reader | ||
current *iteratorEntry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
field current
is unused (from unused
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 5 files at r2, 3 of 8 files at r5, 3 of 3 files at r6.
Reviewable status: all files reviewed, 21 unresolved discussions (waiting on @balajijinnah, @golangcibot, and @manishrjain)
dgraph/cmd/bulk/loader.go, line 235 at r6 (raw file):
r := reducer{ state: ld.state, mu: new(sync.RWMutex),
If you make mu
not a pointer you won't have to worry about initializing it.
Also, maybe add a method NewReducer
to encapsulate the initialization. Not needed if it's only happening once but makes the code a bit cleaner.
dgraph/cmd/bulk/reduce.go, line 154 at r6 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
field
current
is unused (fromunused
)
I think you stopped using current but didn't remove the definition.
dgraph/cmd/bulk/reduce.go, line 172 at r6 (raw file):
} func (mi *mapIterator) startBatchingForKeys(partitionsKeys [][]byte) {
does renaming this to startBatching
still conveys the same idea? Current name is a bit too long.
startBatchingKeys
also could work.
dgraph/cmd/bulk/reduce.go, line 221 at r6 (raw file):
so,
can remove the comma here.
dgraph/cmd/bulk/reduce.go, line 244 at r6 (raw file):
} } func (mi *mapIterator) Close() error {
add a blank line above
dgraph/cmd/bulk/reduce.go, line 315 at r6 (raw file):
req.list = &bpb.KVList{} countKeys := r.toList(req.entries, req.list) // r.toList(req) // contains entries, list and freelist struct.
why is this commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 26 unresolved discussions (waiting on @balajijinnah, @golangcibot, and @manishrjain)
dgraph/cmd/bulk/mapper.go, line 135 at r6 (raw file):
break } if i%shardPartitionNo == 0 {
I think it will be better if we do (i+1)%shardPartitionNo
, otherwise the first entry will always become partition key.
dgraph/cmd/bulk/reduce.go, line 90 at r6 (raw file):
// Start batching for the given keys fmt.Printf("Num map iterators: %d\n", len(mapItrs))
Do you want to keep this?
dgraph/cmd/bulk/reduce.go, line 252 at r6 (raw file):
} func (r *reducer) newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
Why is this a method on reducer
?
dgraph/cmd/bulk/reduce.go, line 348 at r6 (raw file):
idx++ if idx%100 == 0 { fmt.Printf("Wrote req: %d\n", idx)
Do you want to keep this ?
dgraph/cmd/bulk/reduce.go, line 359 at r6 (raw file):
writerCh := make(chan *encodeRequest, 2*cpu) encoderCloser := y.NewCloser(cpu) for i := 0; i < runtime.NumCPU(); i++ {
use cpu
variable.
Signed-off-by: balaji <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 6 of 9 files reviewed, 27 unresolved discussions (waiting on @animesh2049, @golangcibot, @manishrjain, and @martinmr)
dgraph/cmd/bulk/loader.go, line 235 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
If you make
mu
not a pointer you won't have to worry about initializing it.Also, maybe add a method
NewReducer
to encapsulate the initialization. Not needed if it's only happening once but makes the code a bit cleaner.
Done.
dgraph/cmd/bulk/mapper.go, line 135 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
I think it will be better if we do
(i+1)%shardPartitionNo
, otherwise the first entry will always become partition key.
Done.
dgraph/cmd/bulk/reduce.go, line 90 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Do you want to keep this?
Done.
dgraph/cmd/bulk/reduce.go, line 154 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
I think you stopped using current but didn't remove the definition.
Done.
dgraph/cmd/bulk/reduce.go, line 172 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
does renaming this to
startBatching
still conveys the same idea? Current name is a bit too long.
startBatchingKeys
also could work.
Done.
dgraph/cmd/bulk/reduce.go, line 221 at r6 (raw file):
for i := 0; i < runtime.NumCPU(); i++ {
dgraph/cmd/bulk/reduce.go, line 221 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
so,
can remove the comma here.
Done.
dgraph/cmd/bulk/reduce.go, line 244 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
add a blank line above
Done.
dgraph/cmd/bulk/reduce.go, line 252 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Why is this a method on
reducer
?
Done.
dgraph/cmd/bulk/reduce.go, line 315 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
why is this commented out?
Done.
dgraph/cmd/bulk/reduce.go, line 348 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Do you want to keep this ?
Done.
dgraph/cmd/bulk/reduce.go, line 359 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
use
cpu
variable.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @balajijinnah from a discussion.
Reviewable status: 6 of 9 files reviewed, 22 unresolved discussions (waiting on @animesh2049, @golangcibot, @manishrjain, and @martinmr)
Signed-off-by: balaji <[email protected]>
This pr will remove the heap-based sorting, instead we'll batch every keys in memory in partition manner and we sort it
This change is
Docs Preview: