diff --git a/db.go b/db.go index 02db7ef76..9989d640a 100644 --- a/db.go +++ b/db.go @@ -1741,7 +1741,7 @@ func (db *DB) prepareToDrop() (func(), error) { // write it to db. Then, flush all the pending flushtask. So that, we // don't miss any entries. if err := db.blockWrite(); err != nil { - return nil, err + return func() {}, err } reqs := make([]*request, 0, 10) for { diff --git a/stream_writer.go b/stream_writer.go index 3df88f5a6..08735a453 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -74,18 +74,57 @@ func (db *DB) NewStreamWriter() *StreamWriter { // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in // existing DB, stops compactions and any writes being done by other means. Be very careful when // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result -// in a corrupt Badger instance. +// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write. func (sw *StreamWriter) Prepare() error { sw.writeLock.Lock() defer sw.writeLock.Unlock() done, err := sw.db.dropAll() + // Ensure that done() is never called more than once. + var once sync.Once + sw.done = func() { once.Do(done) } + return err +} + +// PrepareIncremental should be called before writing any entry to StreamWriter incrementally. +// In incremental stream write, the tables are written at one level above the current base level. +func (sw *StreamWriter) PrepareIncremental() error { + sw.writeLock.Lock() + defer sw.writeLock.Unlock() // Ensure that done() is never called more than once. var once sync.Once + + // prepareToDrop will stop all the incoming writes and process any pending flush tasks. + // Before we start writing, we'll stop the compactions because no one else should be writing to + // the same level as the stream writer is writing to. + f, err := sw.db.prepareToDrop() + if err != nil { + sw.done = func() { once.Do(f) } + return err + } + sw.db.stopCompactions() + done := func() { + sw.db.startCompactions() + f() + } sw.done = func() { once.Do(done) } - return err + isEmptyDB := true + for _, level := range sw.db.Levels() { + if level.NumTables > 0 { + sw.prevLevel = level.Level + isEmptyDB = false + } + } + if isEmptyDB { + // If DB is empty, we should allow doing incremental stream write. + return nil + } + if sw.prevLevel == 0 { + return fmt.Errorf("Unable to do incremental writes because L0 has data") + } + return nil } // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter @@ -169,11 +208,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { } sw.processingKeys = true - if sw.prevLevel == 0 { - // If prevLevel is 0, that means that we have not written anything yet. Equivalently, - // we were virtually writing to the maxLevel+1. - sw.prevLevel = len(sw.db.lc.levels) - } var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -220,6 +254,14 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { return err } + // Moved this piece of code to within the lock. + if sw.prevLevel == 0 { + // If prevLevel is 0, that means that we have not written anything yet. + // So, we can write to the maxLevel. newWriter writes to prevLevel - 1, + // so we can set prevLevel to len(levels). + sw.prevLevel = len(sw.db.lc.levels) + } + for streamID, req := range streamReqs { writer, ok := sw.writers[streamID] if !ok { diff --git a/stream_writer_test.go b/stream_writer_test.go index bdc520a91..eb82b8c9f 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -602,3 +602,54 @@ func TestStreamWriterWithLargeValue(t *testing.T) { require.NoError(t, sw.Flush(), "sw.Flush() failed") }) } + +func TestStreamWriterIncremental(t *testing.T) { + addIncremtal := func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer buf.Release() + KVToBuffer(&pb.KV{ + Key: []byte("key-2"), + Value: []byte("val"), + Version: 1, + }, buf) + // Now do an incremental stream write. + sw := db.NewStreamWriter() + require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + } + + t.Run("incremental on non-empty DB", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer buf.Release() + KVToBuffer(&pb.KV{ + Key: []byte("key-1"), + Value: []byte("val"), + Version: 1, + }, buf) + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + + addIncremtal(t, db) + + txn := db.NewTransaction(false) + defer txn.Discard() + _, err := txn.Get([]byte("key-1")) + require.NoError(t, err) + _, err = txn.Get([]byte("key-2")) + require.NoError(t, err) + }) + }) + t.Run("incremental on empty DB", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + addIncremtal(t, db) + txn := db.NewTransaction(false) + defer txn.Discard() + _, err := txn.Get([]byte("key-2")) + require.NoError(t, err) + }) + }) +}