From e15befb4c3973fa43ab4b829d1473130006b835e Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Aug 2020 06:42:56 -0700 Subject: [PATCH 1/7] Switch levels.go compaction to only build 5 tables at a time. (cherry picked from commit e254eccfcb322a097fac381bd7ef313ebb2b717b) --- badger/main.go | 2 +- levels.go | 83 +++++++++++++++++++++++--------------------------- 2 files changed, 39 insertions(+), 46 deletions(-) diff --git a/badger/main.go b/badger/main.go index 7542f1e17..1d6bcd65e 100644 --- a/badger/main.go +++ b/badger/main.go @@ -29,7 +29,7 @@ func main() { go func() { for i := 8080; i < 9080; i++ { fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i) - if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil { + if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil { fmt.Println("Port busy. Trying another one...") continue diff --git a/levels.go b/levels.go index 78509391c..f4b7e4108 100644 --- a/levels.go +++ b/levels.go @@ -541,15 +541,14 @@ nextTable: // that would affect the snapshot view guarantee provided by transactions. discardTs := s.kv.orc.discardAtOrBelow() - // Start generating new tables. - type newTableResult struct { - table *table.Table - err error - } - resultCh := make(chan newTableResult) var numBuilds, numVersions int var lastKey, skipKey []byte var vp valuePointer + var newTables []*table.Table + mu := new(sync.Mutex) // Guards newTables + + inflightBuilders := y.NewThrottle(5) + y.Check(inflightBuilders.Do()) for it.Valid() { timeStart := time.Now() dk, err := s.kv.registry.latestDataKey() @@ -646,19 +645,6 @@ nextTable: // called Add() at least once, and builder is not Empty(). s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v", numKeys, numSkips, time.Since(timeStart)) - build := func(fileID uint64) (*table.Table, error) { - fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) - if err != nil { - return nil, errors.Wrapf(err, "While opening new table: %d", fileID) - } - - if _, err := fd.Write(builder.Finish()); err != nil { - return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) - } - tbl, err := table.OpenTable(fd, bopts) - // decrRef is added below. - return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) - } if builder.Empty() { // Cleanup builder resources: builder.Finish() @@ -667,49 +653,56 @@ nextTable: } numBuilds++ fileID := s.reserveFileID() + if err := inflightBuilders.Do(); err != nil { + // Can't return from here, until I decrRef all the tables that I built so far. + break + } go func(builder *table.Builder) { defer builder.Close() - var ( - tbl *table.Table - err error - ) + + build := func(fileID uint64) (*table.Table, error) { + fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) + if err != nil { + return nil, errors.Wrapf(err, "While opening new table: %d", fileID) + } + + if _, err := fd.Write(builder.Finish(false)); err != nil { + return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) + } + tbl, err := table.OpenTable(fd, bopts) + // decrRef is added below. + return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) + } + + var tbl *table.Table + var err error if s.kv.opt.InMemory { tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts) } else { tbl, err = build(fileID) } - resultCh <- newTableResult{tbl, err} - }(builder) - } + inflightBuilders.Done(err) - newTables := make([]*table.Table, 0, 20) - // Wait for all table builders to finish. - var firstErr error - for x := 0; x < numBuilds; x++ { - res := <-resultCh - newTables = append(newTables, res.table) - if firstErr == nil { - firstErr = res.err - } + mu.Lock() + newTables = append(newTables, tbl) + mu.Unlock() + }(builder) } - if firstErr == nil { + // Wait for all table builders to finish and also for newTables accumulator to finish. + err := inflightBuilders.Finish() + if err == nil { // Ensure created files' directory entries are visible. We don't mind the extra latency // from not doing this ASAP after all file creation has finished because this is a // background operation. - firstErr = s.kv.syncDir(s.kv.opt.Dir) + err = s.kv.syncDir(s.kv.opt.Dir) } - if firstErr != nil { + if err != nil { // An error happened. Delete all the newly created table files (by calling DecrRef // -- we're the only holders of a ref). - for j := 0; j < numBuilds; j++ { - if newTables[j] != nil { - _ = newTables[j].DecrRef() - } - } - errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd) - return nil, nil, errorReturn + _ = decrRefs(newTables) + return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd) } sort.Slice(newTables, func(i, j int) bool { From 5c2b47c2549b55a53bbe86af13757c1e2ced2534 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Aug 2020 07:55:21 -0700 Subject: [PATCH 2/7] Have a dedicated compactor for L0 and L1. (cherry picked from commit f416452ca4c029d9629c84f2cd23218982687fa1) --- db.go | 7 +++++-- level_handler.go | 5 ++--- levels.go | 47 +++++++++++++++++++++++++++++++---------------- options.go | 6 +++--- 4 files changed, 41 insertions(+), 24 deletions(-) diff --git a/db.go b/db.go index e3416515d..dd515e473 100644 --- a/db.go +++ b/db.go @@ -192,6 +192,9 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { // Open returns a new DB object. func Open(opt Options) (db *DB, err error) { + if opt.NumCompactors < 2 { + return nil, errors.New("Cannot have less than 2 compactors") + } if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") { return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set") } @@ -528,7 +531,7 @@ func (db *DB) close() (err error) { // Force Compact L0 // We don't need to care about cstatus since no parallel compaction is running. if db.opt.CompactL0OnClose { - err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73}) + err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73}) switch err { case errFillTables: // This error only means that there might be enough tables to do a compaction. So, we @@ -1455,7 +1458,7 @@ func (db *DB) Flatten(workers int) error { errCh := make(chan error, 1) for i := 0; i < workers; i++ { go func() { - errCh <- db.lc.doCompact(cp) + errCh <- db.lc.doCompact(174, cp) }() } var success int diff --git a/level_handler.go b/level_handler.go index 19ba0892b..ce48965fe 100644 --- a/level_handler.go +++ b/level_handler.go @@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool { // Need lock as we may be deleting the first table during a level 0 compaction. s.Lock() defer s.Unlock() - // Return false only if L0 is in memory and number of tables is more than number of - // ZeroTableStall. For on disk L0, we should just add the tables to the level. - if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { + // Stall (by returning false) if we are above the specified stall setting for L0. + if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { return false } diff --git a/levels.go b/levels.go index f4b7e4108..6cba8d55a 100644 --- a/levels.go +++ b/levels.go @@ -29,6 +29,7 @@ import ( "golang.org/x/net/trace" + "github.com/dgraph-io/badger/v2/manual" "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/table" "github.com/dgraph-io/badger/v2/y" @@ -306,7 +307,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { // function in logs, and forces a compaction. dropPrefixes: prefixes, } - if err := s.doCompact(cp); err != nil { + if err := s.doCompact(175, cp); err != nil { opt.Warningf("While compacting level 0: %v", err) return nil } @@ -366,11 +367,13 @@ func (s *levelsController) startCompact(lc *y.Closer) { n := s.kv.opt.NumCompactors lc.AddRunning(n - 1) for i := 0; i < n; i++ { - go s.runWorker(lc) + // The worker with id=0 is dedicated to L0 and L1. This is not counted + // towards the user specified NumCompactors. + go s.runCompactor(i, lc) } } -func (s *levelsController) runWorker(lc *y.Closer) { +func (s *levelsController) runCompactor(id int, lc *y.Closer) { defer lc.Done() randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond) @@ -381,7 +384,7 @@ func (s *levelsController) runWorker(lc *y.Closer) { return } - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -391,7 +394,15 @@ func (s *levelsController) runWorker(lc *y.Closer) { prios := s.pickCompactLevels() loop: for _, p := range prios { - err := s.doCompact(p) + if id == 0 && p.level > 1 { + // If I'm ID zero, I only compact L0 and L1. + continue + } + if id != 0 && p.level <= 1 { + // If I'm ID non-zero, I do NOT compact L0 and L1. + continue + } + err := s.doCompact(id, p) switch err { case nil: break loop @@ -453,10 +464,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { prios = append(prios, pri) } } - // We used to sort compaction priorities based on the score. But, we - // decided to compact based on the level, not the priority. So, upper - // levels (level 0, level 1, etc) always get compacted first, before the - // lower levels -- this allows us to avoid stalls. + // We should continue to sort the compaction priorities by score. Now that we have a dedicated + // compactor for L0 and L1, we don't need to sort by level here. + sort.Slice(prios, func(i, j int) bool { + return prios[i].score > prios[j].score + }) return prios } @@ -548,7 +560,6 @@ nextTable: mu := new(sync.Mutex) // Guards newTables inflightBuilders := y.NewThrottle(5) - y.Check(inflightBuilders.Do()) for it.Valid() { timeStart := time.Now() dk, err := s.kv.registry.latestDataKey() @@ -685,6 +696,9 @@ nextTable: mu.Lock() newTables = append(newTables, tbl) + num := atomic.LoadInt32(&table.NumBlocks) + allocs := float64(atomic.LoadInt64(&manual.NumAllocs)) / float64((1 << 20)) + fmt.Printf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs) mu.Unlock() }(builder) } @@ -956,7 +970,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { var errFillTables = errors.New("Unable to fill tables") // doCompact picks some table on level l and compacts it away to the next level. -func (s *levelsController) doCompact(p compactionPriority) error { +func (s *levelsController) doCompact(id int, p compactionPriority) error { l := p.level y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. @@ -969,7 +983,7 @@ func (s *levelsController) doCompact(p compactionPriority) error { cd.elog.SetMaxEvents(100) defer cd.elog.Finish() - s.kv.opt.Infof("Got compaction priority: %+v", p) + s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p) // While picking tables to be compacted, both levels' tables are expected to // remain unchanged. @@ -985,16 +999,17 @@ func (s *levelsController) doCompact(p compactionPriority) error { } defer s.cstatus.delete(cd) // Remove the ranges from compaction status. - s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level) + s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n", + id, p, cd.thisLevel.level) s.cstatus.toLog(cd.elog) if err := s.runCompactDef(l, cd); err != nil { // This compaction couldn't be done successfully. - s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd) + s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd) return err } s.cstatus.toLog(cd.elog) - s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level) + s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level) return nil } @@ -1018,7 +1033,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // Stall. Make sure all levels are healthy before we unstall. var timeStart time.Time { - s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) + s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) s.cstatus.RLock() for i := 0; i < s.kv.opt.MaxLevels; i++ { s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n", diff --git a/options.go b/options.go index 36d073b78..80e641376 100644 --- a/options.go +++ b/options.go @@ -129,9 +129,9 @@ func DefaultOptions(path string) Options { // table.Nothing to not preload the tables. MaxLevels: 7, MaxTableSize: 64 << 20, - NumCompactors: 2, // Compactions can be expensive. Only run 2. + NumCompactors: 2, // Run at least 2 compactors. One is dedicated for L0. NumLevelZeroTables: 5, - NumLevelZeroTablesStall: 10, + NumLevelZeroTablesStall: 15, NumMemtables: 5, BloomFalsePositive: 0.01, BlockSize: 4 * 1024, @@ -460,7 +460,7 @@ func (opt Options) WithValueLogMaxEntries(val uint32) Options { // NumCompactors sets the number of compaction workers to run concurrently. // Setting this to zero stops compactions, which could eventually cause writes to block forever. // -// The default value of NumCompactors is 2. +// The default value of NumCompactors is 2. One is dedicated just for L0. func (opt Options) WithNumCompactors(val int) Options { opt.NumCompactors = val return opt From 6c03cb8d4664ff0cd63886c59558432f2ca21af3 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 14 Aug 2020 15:48:27 +0530 Subject: [PATCH 3/7] Fix numCompactor error (cherry picked from commit 1c1e17c9bcb7ecf0399afed372733ede5867c9e5) --- db.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/db.go b/db.go index dd515e473..38051a94f 100644 --- a/db.go +++ b/db.go @@ -192,8 +192,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { // Open returns a new DB object. func Open(opt Options) (db *DB, err error) { - if opt.NumCompactors < 2 { - return nil, errors.New("Cannot have less than 2 compactors") + // It's okay to have zero compactors which will disable all compactions but + // we cannot have just one compactor otherwise we will end up with all data + // one level 2. + if opt.NumCompactors == 1 { + return nil, errors.New("Cannot have 1 compactor. Need at least 2") } if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") { return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set") From 6c33afef9f356ebed50a519ec697c583ff4c9183 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 14 Aug 2020 15:49:05 +0530 Subject: [PATCH 4/7] Handle nil table in compaction and test fixes (cherry picked from commit 233be8ee6467c6aad08fce5706a1e672bdc9b97b) --- db.go | 2 +- db2_test.go | 2 +- levels.go | 12 ++++-- levels_test.go | 96 +++++++++++++++++++++--------------------------- manifest_test.go | 2 +- 5 files changed, 52 insertions(+), 62 deletions(-) diff --git a/db.go b/db.go index 38051a94f..b6edb7265 100644 --- a/db.go +++ b/db.go @@ -1461,7 +1461,7 @@ func (db *DB) Flatten(workers int) error { errCh := make(chan error, 1) for i := 0; i < workers; i++ { go func() { - errCh <- db.lc.doCompact(174, cp) + errCh <- db.lc.doCompact(175, cp) }() } var success int diff --git a/db2_test.go b/db2_test.go index 4c29d6358..dadd4966d 100644 --- a/db2_test.go +++ b/db2_test.go @@ -547,7 +547,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true) require.NoError(t, err) - _, err = fd.Write(b.Finish()) + _, err = fd.Write(b.Finish(false)) require.NoError(t, err, "unable to write to file") tab, err := table.OpenTable(fd, bopts) diff --git a/levels.go b/levels.go index 6cba8d55a..dea27d5bb 100644 --- a/levels.go +++ b/levels.go @@ -29,7 +29,6 @@ import ( "golang.org/x/net/trace" - "github.com/dgraph-io/badger/v2/manual" "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/table" "github.com/dgraph-io/badger/v2/y" @@ -307,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { // function in logs, and forces a compaction. dropPrefixes: prefixes, } - if err := s.doCompact(175, cp); err != nil { + if err := s.doCompact(174, cp); err != nil { opt.Warningf("While compacting level 0: %v", err) return nil } @@ -694,11 +693,16 @@ nextTable: } inflightBuilders.Done(err) + // If we couldn't build the table, return fast. + if err != nil { + return + } + mu.Lock() newTables = append(newTables, tbl) num := atomic.LoadInt32(&table.NumBlocks) - allocs := float64(atomic.LoadInt64(&manual.NumAllocs)) / float64((1 << 20)) - fmt.Printf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs) + allocs := float64(atomic.LoadInt64(&y.NumAllocs)) / float64((1 << 20)) + s.kv.opt.Logger.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs) mu.Unlock() }(builder) } diff --git a/levels_test.go b/levels_test.go index b50217ed3..f8aa65e89 100644 --- a/levels_test.go +++ b/levels_test.go @@ -49,7 +49,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { panic(err) } - if _, err = fd.Write(b.Finish()); err != nil { + if _, err = fd.Write(b.Finish(false)); err != nil { panic(err) } tab, err := table.OpenTable(fd, opts) @@ -740,7 +740,7 @@ func createEmptyTable(db *DB) *table.Table { b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0) // Open table in memory to avoid adding changes to manifest file. - tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts) + tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts) if err != nil { panic(err) } @@ -749,52 +749,6 @@ func createEmptyTable(db *DB) *table.Table { } func TestL0Stall(t *testing.T) { - test := func(t *testing.T, opt *Options) { - runBadgerTest(t, opt, func(t *testing.T, db *DB) { - db.lc.levels[0].Lock() - // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level - // zero and all new additions are expected to stall if L0 is in memory. - for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { - db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) - } - db.lc.levels[0].Unlock() - - timeout := time.After(5 * time.Second) - done := make(chan bool) - - go func() { - tab := createEmptyTable(db) - require.NoError(t, db.lc.addLevel0Table(tab)) - tab.DecrRef() - done <- true - }() - // Let it stall for a second. - time.Sleep(time.Second) - - select { - case <-timeout: - if opt.KeepL0InMemory { - t.Log("Timeout triggered") - // Mark this test as successful since L0 is in memory and the - // addition of new table to L0 is supposed to stall. - - // Remove tables from level 0 so that the stalled - // compaction can make progress. This does not have any - // effect on the test. This is done so that the goroutine - // stuck on addLevel0Table can make progress and end. - db.lc.levels[0].Lock() - db.lc.levels[0].tables = nil - db.lc.levels[0].Unlock() - <-done - } else { - t.Fatal("Test didn't finish in time") - } - case <-done: - // The test completed before 5 second timeout. Mark it as successful. - } - }) - } - opt := DefaultOptions("") // Disable all compactions. opt.NumCompactors = 0 @@ -803,13 +757,45 @@ func TestL0Stall(t *testing.T) { // Addition of new tables will stall if there are 4 or more L0 tables. opt.NumLevelZeroTablesStall = 4 - t.Run("with KeepL0InMemory", func(t *testing.T) { - opt.KeepL0InMemory = true - test(t, &opt) - }) - t.Run("with L0 on disk", func(t *testing.T) { - opt.KeepL0InMemory = false - test(t, &opt) + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + db.lc.levels[0].Lock() + // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level + // zero and all new additions are expected to stall if L0 is in memory. + for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { + db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) + } + db.lc.levels[0].Unlock() + + timeout := time.After(5 * time.Second) + done := make(chan bool) + + go func() { + tab := createEmptyTable(db) + require.NoError(t, db.lc.addLevel0Table(tab)) + tab.DecrRef() + done <- true + }() + // Let it stall for a second. + time.Sleep(time.Second) + + select { + case <-timeout: + t.Log("Timeout triggered") + // Mark this test as successful since L0 is in memory and the + // addition of new table to L0 is supposed to stall. + + // Remove tables from level 0 so that the stalled + // compaction can make progress. This does not have any + // effect on the test. This is done so that the goroutine + // stuck on addLevel0Table can make progress and end. + db.lc.levels[0].Lock() + db.lc.levels[0].tables = nil + db.lc.levels[0].Unlock() + <-done + case <-done: + // The test completed before 5 second timeout. Mark it as successful. + t.Fatal("Test did not stall") + } }) } diff --git a/manifest_test.go b/manifest_test.go index 5062b3f1b..c52e719d5 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -155,7 +155,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.Fil UserMeta: 0, }, 0) } - _, err = f.Write(b.Finish()) + _, err = f.Write(b.Finish(false)) require.NoError(t, err, "unable to write to file.") f.Close() f, _ = y.OpenSyncedFile(filename, true) From 6391efe0ebc15d0d224ff28fe74195eec02bf2e4 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 18 Aug 2020 17:35:15 +0530 Subject: [PATCH 5/7] fix(compaction): Use seperator compactor for L0 and L1 --- db2_test.go | 2 +- levels.go | 5 +---- levels_test.go | 4 ++-- manifest_test.go | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/db2_test.go b/db2_test.go index dadd4966d..4c29d6358 100644 --- a/db2_test.go +++ b/db2_test.go @@ -547,7 +547,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true) require.NoError(t, err) - _, err = fd.Write(b.Finish(false)) + _, err = fd.Write(b.Finish()) require.NoError(t, err, "unable to write to file") tab, err := table.OpenTable(fd, bopts) diff --git a/levels.go b/levels.go index dea27d5bb..498755eea 100644 --- a/levels.go +++ b/levels.go @@ -676,7 +676,7 @@ nextTable: return nil, errors.Wrapf(err, "While opening new table: %d", fileID) } - if _, err := fd.Write(builder.Finish(false)); err != nil { + if _, err := fd.Write(builder.Finish()); err != nil { return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) } tbl, err := table.OpenTable(fd, bopts) @@ -700,9 +700,6 @@ nextTable: mu.Lock() newTables = append(newTables, tbl) - num := atomic.LoadInt32(&table.NumBlocks) - allocs := float64(atomic.LoadInt64(&y.NumAllocs)) / float64((1 << 20)) - s.kv.opt.Logger.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs) mu.Unlock() }(builder) } diff --git a/levels_test.go b/levels_test.go index f8aa65e89..0ef4659c4 100644 --- a/levels_test.go +++ b/levels_test.go @@ -49,7 +49,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { panic(err) } - if _, err = fd.Write(b.Finish(false)); err != nil { + if _, err = fd.Write(b.Finish()); err != nil { panic(err) } tab, err := table.OpenTable(fd, opts) @@ -740,7 +740,7 @@ func createEmptyTable(db *DB) *table.Table { b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0) // Open table in memory to avoid adding changes to manifest file. - tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts) + tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts) if err != nil { panic(err) } diff --git a/manifest_test.go b/manifest_test.go index c52e719d5..5062b3f1b 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -155,7 +155,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.Fil UserMeta: 0, }, 0) } - _, err = f.Write(b.Finish(false)) + _, err = f.Write(b.Finish()) require.NoError(t, err, "unable to write to file.") f.Close() f, _ = y.OpenSyncedFile(filename, true) From 2815219cdb3a1d677632448408b0d2947fe45d16 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 19 Aug 2020 16:08:04 +0530 Subject: [PATCH 6/7] fix race condition in newtables (cherry picked from commit e5878e70e6836adc163e50065228708543066666) --- levels.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/levels.go b/levels.go index 498755eea..f0af17de4 100644 --- a/levels.go +++ b/levels.go @@ -669,6 +669,7 @@ nextTable: } go func(builder *table.Builder) { defer builder.Close() + defer inflightBuilders.Done(err) build := func(fileID uint64) (*table.Table, error) { fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) @@ -691,7 +692,6 @@ nextTable: } else { tbl, err = build(fileID) } - inflightBuilders.Done(err) // If we couldn't build the table, return fast. if err != nil { From dcf771e775318f56c3e18020bfefa275337884d5 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 19 Aug 2020 18:10:47 +0530 Subject: [PATCH 7/7] Fix persistDiscardStats test --- value_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/value_test.go b/value_test.go index f08348c9c..7a358c16b 100644 --- a/value_test.go +++ b/value_test.go @@ -489,6 +489,8 @@ func TestPersistLFDiscardStats(t *testing.T) { err = db.Close() require.NoError(t, err) + // Avoid running compactors on reopening badger. + opt.NumCompactors = 0 db, err = Open(opt) require.NoError(t, err) defer db.Close()