Skip to content

Commit 58d0674

Browse files
authored
fix(stream): add managed mode check (#1712)
1 parent 275264c commit 58d0674

File tree

3 files changed

+42
-13
lines changed

3 files changed

+42
-13
lines changed

stream.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
128128
}
129129
kv.Version = item.Version()
130130
kv.ExpiresAt = item.ExpiresAt()
131-
kv.Meta = []byte{item.meta}
131+
// As we do full copy, we need to transmit only if it is a delete key or not.
132+
kv.Meta = []byte{item.meta & bitDelete}
132133
kv.UserMeta = a.Copy([]byte{item.UserMeta()})
133134

134135
list.Kv = append(list.Kv, kv)

stream_writer.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,11 @@ func (w *sortedWriter) handleRequests() {
383383
for i, e := range req.Entries {
384384
// If badger is running in InMemory mode, len(req.Ptrs) == 0.
385385
var vs y.ValueStruct
386-
if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
386+
// Sorted stream writer receives Key-Value (not a pointer to value). So, its upto the
387+
// writer (and not the sender) to determine if the Value goes to vlog or stays in SST
388+
// only. In managed mode, we do not write values to vlog and hence we would not have
389+
// req.Ptrs initialized.
390+
if w.db.opt.managedTxns || e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
387391
vs = y.ValueStruct{
388392
Value: e.Value,
389393
Meta: e.meta,

stream_writer_test.go

+35-11
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
func getSortedKVList(valueSize, listSize int) *z.Buffer {
3737
value := make([]byte, valueSize)
3838
y.Check2(rand.Read(value))
39-
buf := z.NewBuffer(10 << 20, "test")
39+
buf := z.NewBuffer(10<<20, "test")
4040
for i := 0; i < listSize; i++ {
4141
key := make([]byte, 8)
4242
binary.BigEndian.PutUint64(key, uint64(i))
@@ -175,7 +175,7 @@ func TestStreamWriter3(t *testing.T) {
175175
value := make([]byte, valueSize)
176176
y.Check2(rand.Read(value))
177177
counter := 0
178-
buf := z.NewBuffer(10 << 20, "test")
178+
buf := z.NewBuffer(10<<20, "test")
179179
defer buf.Release()
180180
for i := 0; i < noOfKeys; i++ {
181181
key := make([]byte, 8)
@@ -272,7 +272,7 @@ func TestStreamWriter4(t *testing.T) {
272272
require.NoError(t, err, "error while updating db")
273273
}
274274

275-
buf := z.NewBuffer(10 << 20, "test")
275+
buf := z.NewBuffer(10<<20, "test")
276276
defer buf.Release()
277277
KVToBuffer(&pb.KV{
278278
Key: []byte("key-1"),
@@ -297,7 +297,7 @@ func TestStreamWriter5(t *testing.T) {
297297
right[0] = 0xff
298298
copy(right[1:], []byte("break"))
299299

300-
buf := z.NewBuffer(10 << 20, "test")
300+
buf := z.NewBuffer(10<<20, "test")
301301
defer buf.Release()
302302
KVToBuffer(&pb.KV{
303303
Key: left,
@@ -336,7 +336,7 @@ func TestStreamWriter6(t *testing.T) {
336336
// will be written to level 6, we need to insert at least 1 mb of data.
337337
// Setting keycount below 32 would cause this test to fail.
338338
keyCount := 40
339-
buf := z.NewBuffer(10 << 20, "test")
339+
buf := z.NewBuffer(10<<20, "test")
340340
defer buf.Release()
341341
for i := range str {
342342
for j := 0; j < keyCount; j++ {
@@ -377,7 +377,7 @@ func TestStreamWriterCancel(t *testing.T) {
377377
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
378378
str := []string{"a", "a", "b", "b", "c", "c"}
379379
ver := 1
380-
buf := z.NewBuffer(10 << 20, "test")
380+
buf := z.NewBuffer(10<<20, "test")
381381
defer buf.Release()
382382
for i := range str {
383383
kv := &pb.KV{
@@ -411,7 +411,7 @@ func TestStreamDone(t *testing.T) {
411411
var val [10]byte
412412
rand.Read(val[:])
413413
for i := 0; i < 10; i++ {
414-
buf := z.NewBuffer(10 << 20, "test")
414+
buf := z.NewBuffer(10<<20, "test")
415415
defer buf.Release()
416416
kv1 := &pb.KV{
417417
Key: []byte(fmt.Sprintf("%d", i)),
@@ -452,7 +452,7 @@ func TestSendOnClosedStream(t *testing.T) {
452452

453453
var val [10]byte
454454
rand.Read(val[:])
455-
buf := z.NewBuffer(10 << 20, "test")
455+
buf := z.NewBuffer(10<<20, "test")
456456
defer buf.Release()
457457
kv1 := &pb.KV{
458458
Key: []byte(fmt.Sprintf("%d", 1)),
@@ -475,7 +475,7 @@ func TestSendOnClosedStream(t *testing.T) {
475475
require.NoError(t, db.Close())
476476
}()
477477
// Send once stream is closed.
478-
buf1 := z.NewBuffer(10 << 20, "test")
478+
buf1 := z.NewBuffer(10<<20, "test")
479479
defer buf1.Release()
480480
kv1 = &pb.KV{
481481
Key: []byte(fmt.Sprintf("%d", 2)),
@@ -502,7 +502,7 @@ func TestSendOnClosedStream2(t *testing.T) {
502502

503503
var val [10]byte
504504
rand.Read(val[:])
505-
buf := z.NewBuffer(10 << 20, "test")
505+
buf := z.NewBuffer(10<<20, "test")
506506
defer buf.Release()
507507
kv1 := &pb.KV{
508508
Key: []byte(fmt.Sprintf("%d", 1)),
@@ -549,7 +549,7 @@ func TestStreamWriterEncrypted(t *testing.T) {
549549
key := []byte("mykey")
550550
value := []byte("myvalue")
551551

552-
buf := z.NewBuffer(10 << 20, "test")
552+
buf := z.NewBuffer(10<<20, "test")
553553
defer buf.Release()
554554
KVToBuffer(&pb.KV{
555555
Key: key,
@@ -578,3 +578,27 @@ func TestStreamWriterEncrypted(t *testing.T) {
578578
require.NoError(t, db.Close())
579579

580580
}
581+
582+
// Test that stream writer does not crashes with large values in managed mode. In managed mode, we
583+
// don't write to value log.
584+
func TestStreamWriterWithLargeValue(t *testing.T) {
585+
opts := DefaultOptions("")
586+
opts.managedTxns = true
587+
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
588+
buf := z.NewBuffer(10<<20, "test")
589+
defer buf.Release()
590+
val := make([]byte, 10<<20)
591+
_, err := rand.Read(val)
592+
require.NoError(t, err)
593+
KVToBuffer(&pb.KV{
594+
Key: []byte("key"),
595+
Value: val,
596+
Version: 1,
597+
}, buf)
598+
599+
sw := db.NewStreamWriter()
600+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
601+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
602+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
603+
})
604+
}

0 commit comments

Comments
 (0)