diff --git a/stream.go b/stream.go index 30814e70c..eb7341e49 100644 --- a/stream.go +++ b/stream.go @@ -32,6 +32,12 @@ import ( const pageSize = 4 << 20 // 4MB +// maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit +// as a single list that is still over the limit will have to be sent as is since it +// cannot be split further. This limit prevents the framework from creating batches +// so big that sending them causes issues (e.g running into the max size gRPC limit). +var maxStreamSize = uint64(100 << 20) // 100MB + // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted @@ -248,9 +254,29 @@ func (st *Stream) streamKVs(ctx context.Context) error { defer t.Stop() now := time.Now() + sendBatch := func(batch *pb.KVList) error { + sz := uint64(proto.Size(batch)) + bytesSent += sz + count += len(batch.Kv) + t := time.Now() + if err := st.Send(batch); err != nil { + return err + } + st.db.opt.Infof("%s Created batch of size: %s in %s.\n", + st.LogPrefix, humanize.Bytes(sz), time.Since(t)) + return nil + } + slurp := func(batch *pb.KVList) error { loop: for { + // Send the batch immediately if it already exceeds the maximum allowed size. + // If the size of the batch exceeds maxStreamSize, break from the loop to + // avoid creating a batch that is so big that certain limits are reached. + sz := uint64(proto.Size(batch)) + if sz > maxStreamSize { + break loop + } select { case kvs, ok := <-st.kvChan: if !ok { @@ -262,16 +288,7 @@ func (st *Stream) streamKVs(ctx context.Context) error { break loop } } - sz := uint64(proto.Size(batch)) - bytesSent += sz - count += len(batch.Kv) - t := time.Now() - if err := st.Send(batch); err != nil { - return err - } - st.db.opt.Infof("%s Created batch of size: %s in %s.\n", - st.LogPrefix, humanize.Bytes(sz), time.Since(t)) - return nil + return sendBatch(batch) } outer: @@ -297,6 +314,8 @@ outer: } y.AssertTrue(kvs != nil) batch = kvs + + // Otherwise, slurp more keys into this batch. if err := slurp(batch); err != nil { return err } diff --git a/stream_test.go b/stream_test.go index f60e3decb..f3c4b1d48 100644 --- a/stream_test.go +++ b/stream_test.go @@ -77,9 +77,7 @@ func TestStream(t *testing.T) { stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = "Testing" c := &collector{} - stream.Send = func(list *bpb.KVList) error { - return c.Send(list) - } + stream.Send = c.Send // Test case 1. Retrieve everything. err = stream.Orchestrate(ctxb) @@ -186,9 +184,7 @@ func TestStreamWithThreadId(t *testing.T) { return stream.ToList(key, itr) } c := &collector{} - stream.Send = func(list *bpb.KVList) error { - return c.Send(list) - } + stream.Send = c.Send err = stream.Orchestrate(ctxb) require.NoError(t, err) @@ -207,3 +203,54 @@ func TestStreamWithThreadId(t *testing.T) { } require.NoError(t, db.Close()) } + +func TestBigStream(t *testing.T) { + // Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller + // dataset than it would otherwise need. + originalMaxStreamSize := maxStreamSize + maxStreamSize = 1 << 20 + defer func() { + maxStreamSize = originalMaxStreamSize + }() + + testSize := int(1e6) + dir, err := ioutil.TempDir("", "badger-big-test") + require.NoError(t, err) + defer removeDir(dir) + + db, err := OpenManaged(DefaultOptions(dir)) + require.NoError(t, err) + + var count int + wb := db.NewWriteBatchAt(5) + for _, prefix := range []string{"p0", "p1", "p2"} { + for i := 1; i <= testSize; i++ { + require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i)))) + count++ + } + } + require.NoError(t, wb.Flush()) + + stream := db.NewStreamAt(math.MaxUint64) + stream.LogPrefix = "Testing" + c := &collector{} + stream.Send = c.Send + + // Test case 1. Retrieve everything. + err = stream.Orchestrate(ctxb) + require.NoError(t, err) + require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv)) + + m := make(map[string]int) + for _, kv := range c.kv { + prefix, ki := keyToInt(kv.Key) + expected := value(ki) + require.Equal(t, expected, kv.Value) + m[prefix]++ + } + require.Equal(t, 3, len(m)) + for pred, count := range m { + require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred) + } + require.NoError(t, db.Close()) +}