Skip to content

Commit

Permalink
Add a limit to the size of the batches sent over a stream. (#1412)
Browse files Browse the repository at this point in the history
Currently, there's no limit to how many KVs can be included in a single
batch. This leads to issues when the size is over a hard limit. For
example, a batch of size > 2GB will cause issues if it has to be sent
over gRPC.

Co-authored-by: Ibrahim Jarif <[email protected]>
  • Loading branch information
martinmr and Ibrahim Jarif authored Jul 13, 2020
1 parent 8dd3fd9 commit c892251
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
39 changes: 29 additions & 10 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import (

const pageSize = 4 << 20 // 4MB

// maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
// as a single list that is still over the limit will have to be sent as is since it
// cannot be split further. This limit prevents the framework from creating batches
// so big that sending them causes issues (e.g running into the max size gRPC limit).
var maxStreamSize = uint64(100 << 20) // 100MB

// Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
// key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
// ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
Expand Down Expand Up @@ -248,9 +254,29 @@ func (st *Stream) streamKVs(ctx context.Context) error {
defer t.Stop()
now := time.Now()

sendBatch := func(batch *pb.KVList) error {
sz := uint64(proto.Size(batch))
bytesSent += sz
count += len(batch.Kv)
t := time.Now()
if err := st.Send(batch); err != nil {
return err
}
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))
return nil
}

slurp := func(batch *pb.KVList) error {
loop:
for {
// Send the batch immediately if it already exceeds the maximum allowed size.
// If the size of the batch exceeds maxStreamSize, break from the loop to
// avoid creating a batch that is so big that certain limits are reached.
sz := uint64(proto.Size(batch))
if sz > maxStreamSize {
break loop
}
select {
case kvs, ok := <-st.kvChan:
if !ok {
Expand All @@ -262,16 +288,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
break loop
}
}
sz := uint64(proto.Size(batch))
bytesSent += sz
count += len(batch.Kv)
t := time.Now()
if err := st.Send(batch); err != nil {
return err
}
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))
return nil
return sendBatch(batch)
}

outer:
Expand All @@ -297,6 +314,8 @@ outer:
}
y.AssertTrue(kvs != nil)
batch = kvs

// Otherwise, slurp more keys into this batch.
if err := slurp(batch); err != nil {
return err
}
Expand Down
59 changes: 53 additions & 6 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ func TestStream(t *testing.T) {
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = func(list *bpb.KVList) error {
return c.Send(list)
}
stream.Send = c.Send

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
Expand Down Expand Up @@ -186,9 +184,7 @@ func TestStreamWithThreadId(t *testing.T) {
return stream.ToList(key, itr)
}
c := &collector{}
stream.Send = func(list *bpb.KVList) error {
return c.Send(list)
}
stream.Send = c.Send

err = stream.Orchestrate(ctxb)
require.NoError(t, err)
Expand All @@ -207,3 +203,54 @@ func TestStreamWithThreadId(t *testing.T) {
}
require.NoError(t, db.Close())
}

func TestBigStream(t *testing.T) {
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
// dataset than it would otherwise need.
originalMaxStreamSize := maxStreamSize
maxStreamSize = 1 << 20
defer func() {
maxStreamSize = originalMaxStreamSize
}()

testSize := int(1e6)
dir, err := ioutil.TempDir("", "badger-big-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
wb := db.NewWriteBatchAt(5)
for _, prefix := range []string{"p0", "p1", "p2"} {
for i := 1; i <= testSize; i++ {
require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
count++
}
}
require.NoError(t, wb.Flush())

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = c.Send

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))

m := make(map[string]int)
for _, kv := range c.kv {
prefix, ki := keyToInt(kv.Key)
expected := value(ki)
require.Equal(t, expected, kv.Value)
m[prefix]++
}
require.Equal(t, 3, len(m))
for pred, count := range m {
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
}
require.NoError(t, db.Close())
}

0 comments on commit c892251

Please sign in to comment.