Skip to content

feat(stream): add support for incremental stream writer #1722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func writeSorted(db *badger.DB, num uint64) error {
es := 8 + wo.valSz // key size is 8 bytes and value size is valSz

writer := db.NewStreamWriter()
if err := writer.Prepare(); err != nil {
if err := writer.Prepare(false); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2179,7 +2179,7 @@ func (db *DB) StreamDB(outOptions Options) error {
}
defer outDB.Close()
writer := outDB.NewStreamWriter()
if err := writer.Prepare(); err != nil {
if err := writer.Prepare(false); err != nil {
y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
}

Expand Down
2 changes: 1 addition & 1 deletion db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func TestKeyCount(t *testing.T) {
es := 8 + valSz // key size is 8 bytes and value size is valSz

writer := db.NewStreamWriter()
require.NoError(t, writer.Prepare())
require.NoError(t, writer.Prepare(false))

wg := &sync.WaitGroup{}
writeCh := make(chan *pb.KVList, 3)
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2105,7 +2105,7 @@ func TestVerifyChecksum(t *testing.T) {
}

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

Expand Down
60 changes: 48 additions & 12 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,51 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
// existing DB, stops compactions and any writes being done by other means. Be very careful when
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
// in a corrupt Badger instance.
func (sw *StreamWriter) Prepare() error {
// in a corrupt Badger instance. It also supports an incremental stream write. In that case, it
// writes the tables at one level above the current base level.
func (sw *StreamWriter) Prepare(isIncremental bool) error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

done, err := sw.db.dropAll()

// Ensure that done() is never called more than once.
var once sync.Once
sw.done = func() { once.Do(done) }
setDone := func(f func()) {
sw.done = func() {
if f != nil {
once.Do(f)
}
}
}
if !isIncremental {
done, err := sw.db.dropAll()
// Ensure that done() is never called more than once.
setDone(done)
return err
}

// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// Before we start writing, we'll stop the compactions because no one else should be writing to
// the same level as the stream writer is writing to.
f, err := sw.db.prepareToDrop()
if err != nil {
setDone(f)
return err
}
sw.db.stopCompactions()
done := func() {
sw.db.startCompactions()
f()
}
setDone(done)

return err
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
sw.prevLevel = level.Level
}
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
}
return nil
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
Expand Down Expand Up @@ -169,11 +202,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
}

sw.processingKeys = true
if sw.prevLevel == 0 {
// If prevLevel is 0, that means that we have not written anything yet. Equivalently,
// we were virtually writing to the maxLevel+1.
sw.prevLevel = len(sw.db.lc.levels)
}
var meta, userMeta byte
if len(kv.Meta) > 0 {
meta = kv.Meta[0]
Expand Down Expand Up @@ -220,6 +248,14 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
return err
}

// Moved this piece of code to within the lock.
if sw.prevLevel == 0 {
// If prevLevel is 0, that means that we have not written anything yet.
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
// so we can set prevLevel to len(levels).
sw.prevLevel = len(sw.db.lc.levels)
}

for streamID, req := range streamReqs {
writer, ok := sw.writers[streamID]
if !ok {
Expand Down
63 changes: 50 additions & 13 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestStreamWriter1(t *testing.T) {
valueSize := 128
list := getSortedKVList(valueSize, noOfKeys)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(list), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

Expand Down Expand Up @@ -110,7 +110,7 @@ func TestStreamWriter2(t *testing.T) {
valueSize := 128
list := getSortedKVList(valueSize, noOfKeys)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(list), "sw.Write() failed")
// get max version of sw, will be used in transactions for managed mode
maxVs := sw.maxVersion
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestStreamWriter3(t *testing.T) {
}

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
// get max version of sw, will be used in transactions for managed mode
maxVs := sw.maxVersion
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestStreamWriter4(t *testing.T) {
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestStreamWriter5(t *testing.T) {
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
require.NoError(t, db.Close())
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestStreamWriter6(t *testing.T) {
// list has 3 pairs for equal keys. Since each Key has size equal to MaxTableSize
// we would have 6 tables, if keys are not equal. Here we should have 3 tables.
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

Expand Down Expand Up @@ -390,14 +390,14 @@ func TestStreamWriterCancel(t *testing.T) {
}

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
sw.Cancel()

// Use the API incorrectly.
sw1 := db.NewStreamWriter()
defer sw1.Cancel()
require.NoError(t, sw1.Prepare())
require.NoError(t, sw1.Prepare(false))
defer sw1.Cancel()
sw1.Flush()
})
Expand All @@ -406,7 +406,7 @@ func TestStreamWriterCancel(t *testing.T) {
func TestStreamDone(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")

var val [10]byte
rand.Read(val[:])
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestSendOnClosedStream(t *testing.T) {
require.NoError(t, err)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")

var val [10]byte
rand.Read(val[:])
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestSendOnClosedStream2(t *testing.T) {
require.NoError(t, err)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")

var val [10]byte
rand.Read(val[:])
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestStreamWriterEncrypted(t *testing.T) {
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "Prepare failed")
require.NoError(t, sw.Prepare(false), "Prepare failed")
require.NoError(t, sw.Write(buf), "Write failed")
require.NoError(t, sw.Flush(), "Flush failed")

Expand Down Expand Up @@ -597,8 +597,45 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
}

func TestStreamWriterIncremental(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Value: []byte("val"),
Version: 1,
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(false), "sw.Prepare(false) failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

buf.Reset()
KVToBuffer(&pb.KV{
Key: []byte("key-2"),
Value: []byte("val"),
Version: 1,
}, buf)
// Now do an incremental stream write.
sw2 := db.NewStreamWriter()
require.NoError(t, sw2.Prepare(true), "sw.Prepare(true) failed")
require.NoError(t, sw2.Write(buf), "sw.Write() failed")
require.NoError(t, sw2.Flush(), "sw.Flush() failed")

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)

})
}