Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): add support for incremental stream writer #1722

Merged
merged 7 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,7 +1741,7 @@ func (db *DB) prepareToDrop() (func(), error) {
// write it to db. Then, flush all the pending flushtask. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return nil, err
return func() {}, err
}
reqs := make([]*request, 0, 10)
for {
Expand Down
50 changes: 43 additions & 7 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,51 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
// existing DB, stops compactions and any writes being done by other means. Be very careful when
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
// in a corrupt Badger instance.
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
func (sw *StreamWriter) Prepare() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

done, err := sw.db.dropAll()
// Ensure that done() is never called more than once.
var once sync.Once
sw.done = func() { once.Do(done) }
return err
}

// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
// In incremental stream write, the tables are written at one level above the current base level.
func (sw *StreamWriter) PrepareIncremental() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

// Ensure that done() is never called more than once.
var once sync.Once

// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// Before we start writing, we'll stop the compactions because no one else should be writing to
// the same level as the stream writer is writing to.
f, err := sw.db.prepareToDrop()
if err != nil {
sw.done = func() { once.Do(f) }
return err
}
sw.db.stopCompactions()
done := func() {
sw.db.startCompactions()
f()
}
sw.done = func() { once.Do(done) }

return err
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
sw.prevLevel = level.Level
}
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
}
return nil
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
Expand Down Expand Up @@ -169,11 +202,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
}

sw.processingKeys = true
if sw.prevLevel == 0 {
// If prevLevel is 0, that means that we have not written anything yet. Equivalently,
// we were virtually writing to the maxLevel+1.
sw.prevLevel = len(sw.db.lc.levels)
}
var meta, userMeta byte
if len(kv.Meta) > 0 {
meta = kv.Meta[0]
Expand Down Expand Up @@ -220,6 +248,14 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
return err
}

// Moved this piece of code to within the lock.
if sw.prevLevel == 0 {
// If prevLevel is 0, that means that we have not written anything yet.
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
// so we can set prevLevel to len(levels).
sw.prevLevel = len(sw.db.lc.levels)
}

for streamID, req := range streamReqs {
writer, ok := sw.writers[streamID]
if !ok {
Expand Down
33 changes: 33 additions & 0 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,36 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
}

func TestStreamWriterIncremental(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Value: []byte("val"),
Version: 1,
}, buf)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
buf.Reset()
KVToBuffer(&pb.KV{
Key: []byte("key-2"),
Value: []byte("val"),
Version: 1,
}, buf)
// Now do an incremental stream write.
sw2 := db.NewStreamWriter()
require.NoError(t, sw2.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw2.Write(buf), "sw.Write() failed")
require.NoError(t, sw2.Flush(), "sw.Flush() failed")
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)
})
}