Skip to content

Commit

Permalink
Various bug fixes: Break up list and run DropAll func (#1439)
Browse files Browse the repository at this point in the history
* Run the returned func even on error, if the func is not nil.

* Break up keys from a single list output to avoid sending a 
HUGE batch which can exceed Grpc limits.
  • Loading branch information
manishrjain authored Jul 18, 2020
1 parent dfcca75 commit 1ccf3a8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
7 changes: 3 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,11 +1557,10 @@ func (db *DB) prepareToDrop() (func(), error) {
// writes are paused before running DropAll, and resumed after it is finished.
func (db *DB) DropAll() error {
f, err := db.dropAll()
if err != nil {
return err
if f != nil {
f()
}
defer f()
return nil
return err
}

func (db *DB) dropAll() (func(), error) {
Expand Down
40 changes: 22 additions & 18 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
streamId := atomic.AddUint32(&st.nextStreamId, 1)

outList := new(pb.KVList)

sendIt := func() error {
select {
case st.kvChan <- outList:
case <-ctx.Done():
return ctx.Err()
}
outList = new(pb.KVList)
size = 0
return nil
}
var prevKey []byte
for itr.Seek(kr.left); itr.Valid(); {
// it.Valid would only return true for keys with the provided Prefix in iterOpts.
Expand Down Expand Up @@ -202,30 +213,23 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
if list == nil || len(list.Kv) == 0 {
continue
}
outList.Kv = append(outList.Kv, list.Kv...)
size += proto.Size(list)
if size >= pageSize {
for _, kv := range outList.Kv {
kv.StreamId = streamId
for _, kv := range list.Kv {
size += proto.Size(kv)
kv.StreamId = streamId
outList.Kv = append(outList.Kv, kv)

if size < pageSize {
continue
}
select {
case st.kvChan <- outList:
case <-ctx.Done():
return ctx.Err()
if err := sendIt(); err != nil {
return err
}
outList = new(pb.KVList)
size = 0
}
}
if len(outList.Kv) > 0 {
for _, kv := range outList.Kv {
kv.StreamId = streamId
}
// TODO: Think of a way to indicate that a stream is over.
select {
case st.kvChan <- outList:
case <-ctx.Done():
return ctx.Err()
if err := sendIt(); err != nil {
return err
}
}
return nil
Expand Down

0 comments on commit 1ccf3a8

Please sign in to comment.