diff --git a/badger/main.go b/badger/main.go index 7542f1e17..1d6bcd65e 100644 --- a/badger/main.go +++ b/badger/main.go @@ -29,7 +29,7 @@ func main() { go func() { for i := 8080; i < 9080; i++ { fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i) - if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil { + if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil { fmt.Println("Port busy. Trying another one...") continue diff --git a/db.go b/db.go index e3416515d..b6edb7265 100644 --- a/db.go +++ b/db.go @@ -192,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { // Open returns a new DB object. func Open(opt Options) (db *DB, err error) { + // It's okay to have zero compactors which will disable all compactions but + // we cannot have just one compactor otherwise we will end up with all data + // one level 2. + if opt.NumCompactors == 1 { + return nil, errors.New("Cannot have 1 compactor. Need at least 2") + } if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") { return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set") } @@ -528,7 +534,7 @@ func (db *DB) close() (err error) { // Force Compact L0 // We don't need to care about cstatus since no parallel compaction is running. if db.opt.CompactL0OnClose { - err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73}) + err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73}) switch err { case errFillTables: // This error only means that there might be enough tables to do a compaction. So, we @@ -1455,7 +1461,7 @@ func (db *DB) Flatten(workers int) error { errCh := make(chan error, 1) for i := 0; i < workers; i++ { go func() { - errCh <- db.lc.doCompact(cp) + errCh <- db.lc.doCompact(175, cp) }() } var success int diff --git a/level_handler.go b/level_handler.go index 19ba0892b..ce48965fe 100644 --- a/level_handler.go +++ b/level_handler.go @@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool { // Need lock as we may be deleting the first table during a level 0 compaction. s.Lock() defer s.Unlock() - // Return false only if L0 is in memory and number of tables is more than number of - // ZeroTableStall. For on disk L0, we should just add the tables to the level. - if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { + // Stall (by returning false) if we are above the specified stall setting for L0. + if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { return false } diff --git a/levels.go b/levels.go index 78509391c..f0af17de4 100644 --- a/levels.go +++ b/levels.go @@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { // function in logs, and forces a compaction. dropPrefixes: prefixes, } - if err := s.doCompact(cp); err != nil { + if err := s.doCompact(174, cp); err != nil { opt.Warningf("While compacting level 0: %v", err) return nil } @@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) { n := s.kv.opt.NumCompactors lc.AddRunning(n - 1) for i := 0; i < n; i++ { - go s.runWorker(lc) + // The worker with id=0 is dedicated to L0 and L1. This is not counted + // towards the user specified NumCompactors. + go s.runCompactor(i, lc) } } -func (s *levelsController) runWorker(lc *y.Closer) { +func (s *levelsController) runCompactor(id int, lc *y.Closer) { defer lc.Done() randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond) @@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) { return } - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) { prios := s.pickCompactLevels() loop: for _, p := range prios { - err := s.doCompact(p) + if id == 0 && p.level > 1 { + // If I'm ID zero, I only compact L0 and L1. + continue + } + if id != 0 && p.level <= 1 { + // If I'm ID non-zero, I do NOT compact L0 and L1. + continue + } + err := s.doCompact(id, p) switch err { case nil: break loop @@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { prios = append(prios, pri) } } - // We used to sort compaction priorities based on the score. But, we - // decided to compact based on the level, not the priority. So, upper - // levels (level 0, level 1, etc) always get compacted first, before the - // lower levels -- this allows us to avoid stalls. + // We should continue to sort the compaction priorities by score. Now that we have a dedicated + // compactor for L0 and L1, we don't need to sort by level here. + sort.Slice(prios, func(i, j int) bool { + return prios[i].score > prios[j].score + }) return prios } @@ -541,15 +552,13 @@ nextTable: // that would affect the snapshot view guarantee provided by transactions. discardTs := s.kv.orc.discardAtOrBelow() - // Start generating new tables. - type newTableResult struct { - table *table.Table - err error - } - resultCh := make(chan newTableResult) var numBuilds, numVersions int var lastKey, skipKey []byte var vp valuePointer + var newTables []*table.Table + mu := new(sync.Mutex) // Guards newTables + + inflightBuilders := y.NewThrottle(5) for it.Valid() { timeStart := time.Now() dk, err := s.kv.registry.latestDataKey() @@ -646,19 +655,6 @@ nextTable: // called Add() at least once, and builder is not Empty(). s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v", numKeys, numSkips, time.Since(timeStart)) - build := func(fileID uint64) (*table.Table, error) { - fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) - if err != nil { - return nil, errors.Wrapf(err, "While opening new table: %d", fileID) - } - - if _, err := fd.Write(builder.Finish()); err != nil { - return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) - } - tbl, err := table.OpenTable(fd, bopts) - // decrRef is added below. - return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) - } if builder.Empty() { // Cleanup builder resources: builder.Finish() @@ -667,49 +663,61 @@ nextTable: } numBuilds++ fileID := s.reserveFileID() + if err := inflightBuilders.Do(); err != nil { + // Can't return from here, until I decrRef all the tables that I built so far. + break + } go func(builder *table.Builder) { defer builder.Close() - var ( - tbl *table.Table - err error - ) + defer inflightBuilders.Done(err) + + build := func(fileID uint64) (*table.Table, error) { + fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) + if err != nil { + return nil, errors.Wrapf(err, "While opening new table: %d", fileID) + } + + if _, err := fd.Write(builder.Finish()); err != nil { + return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) + } + tbl, err := table.OpenTable(fd, bopts) + // decrRef is added below. + return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) + } + + var tbl *table.Table + var err error if s.kv.opt.InMemory { tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts) } else { tbl, err = build(fileID) } - resultCh <- newTableResult{tbl, err} - }(builder) - } - newTables := make([]*table.Table, 0, 20) - // Wait for all table builders to finish. - var firstErr error - for x := 0; x < numBuilds; x++ { - res := <-resultCh - newTables = append(newTables, res.table) - if firstErr == nil { - firstErr = res.err - } + // If we couldn't build the table, return fast. + if err != nil { + return + } + + mu.Lock() + newTables = append(newTables, tbl) + mu.Unlock() + }(builder) } - if firstErr == nil { + // Wait for all table builders to finish and also for newTables accumulator to finish. + err := inflightBuilders.Finish() + if err == nil { // Ensure created files' directory entries are visible. We don't mind the extra latency // from not doing this ASAP after all file creation has finished because this is a // background operation. - firstErr = s.kv.syncDir(s.kv.opt.Dir) + err = s.kv.syncDir(s.kv.opt.Dir) } - if firstErr != nil { + if err != nil { // An error happened. Delete all the newly created table files (by calling DecrRef // -- we're the only holders of a ref). - for j := 0; j < numBuilds; j++ { - if newTables[j] != nil { - _ = newTables[j].DecrRef() - } - } - errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd) - return nil, nil, errorReturn + _ = decrRefs(newTables) + return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd) } sort.Slice(newTables, func(i, j int) bool { @@ -963,7 +971,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { var errFillTables = errors.New("Unable to fill tables") // doCompact picks some table on level l and compacts it away to the next level. -func (s *levelsController) doCompact(p compactionPriority) error { +func (s *levelsController) doCompact(id int, p compactionPriority) error { l := p.level y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. @@ -976,7 +984,7 @@ func (s *levelsController) doCompact(p compactionPriority) error { cd.elog.SetMaxEvents(100) defer cd.elog.Finish() - s.kv.opt.Infof("Got compaction priority: %+v", p) + s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p) // While picking tables to be compacted, both levels' tables are expected to // remain unchanged. @@ -992,16 +1000,17 @@ func (s *levelsController) doCompact(p compactionPriority) error { } defer s.cstatus.delete(cd) // Remove the ranges from compaction status. - s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level) + s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n", + id, p, cd.thisLevel.level) s.cstatus.toLog(cd.elog) if err := s.runCompactDef(l, cd); err != nil { // This compaction couldn't be done successfully. - s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd) + s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd) return err } s.cstatus.toLog(cd.elog) - s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level) + s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level) return nil } @@ -1025,7 +1034,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // Stall. Make sure all levels are healthy before we unstall. var timeStart time.Time { - s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) + s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) s.cstatus.RLock() for i := 0; i < s.kv.opt.MaxLevels; i++ { s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n", diff --git a/levels_test.go b/levels_test.go index b50217ed3..0ef4659c4 100644 --- a/levels_test.go +++ b/levels_test.go @@ -749,52 +749,6 @@ func createEmptyTable(db *DB) *table.Table { } func TestL0Stall(t *testing.T) { - test := func(t *testing.T, opt *Options) { - runBadgerTest(t, opt, func(t *testing.T, db *DB) { - db.lc.levels[0].Lock() - // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level - // zero and all new additions are expected to stall if L0 is in memory. - for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { - db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) - } - db.lc.levels[0].Unlock() - - timeout := time.After(5 * time.Second) - done := make(chan bool) - - go func() { - tab := createEmptyTable(db) - require.NoError(t, db.lc.addLevel0Table(tab)) - tab.DecrRef() - done <- true - }() - // Let it stall for a second. - time.Sleep(time.Second) - - select { - case <-timeout: - if opt.KeepL0InMemory { - t.Log("Timeout triggered") - // Mark this test as successful since L0 is in memory and the - // addition of new table to L0 is supposed to stall. - - // Remove tables from level 0 so that the stalled - // compaction can make progress. This does not have any - // effect on the test. This is done so that the goroutine - // stuck on addLevel0Table can make progress and end. - db.lc.levels[0].Lock() - db.lc.levels[0].tables = nil - db.lc.levels[0].Unlock() - <-done - } else { - t.Fatal("Test didn't finish in time") - } - case <-done: - // The test completed before 5 second timeout. Mark it as successful. - } - }) - } - opt := DefaultOptions("") // Disable all compactions. opt.NumCompactors = 0 @@ -803,13 +757,45 @@ func TestL0Stall(t *testing.T) { // Addition of new tables will stall if there are 4 or more L0 tables. opt.NumLevelZeroTablesStall = 4 - t.Run("with KeepL0InMemory", func(t *testing.T) { - opt.KeepL0InMemory = true - test(t, &opt) - }) - t.Run("with L0 on disk", func(t *testing.T) { - opt.KeepL0InMemory = false - test(t, &opt) + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + db.lc.levels[0].Lock() + // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level + // zero and all new additions are expected to stall if L0 is in memory. + for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { + db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) + } + db.lc.levels[0].Unlock() + + timeout := time.After(5 * time.Second) + done := make(chan bool) + + go func() { + tab := createEmptyTable(db) + require.NoError(t, db.lc.addLevel0Table(tab)) + tab.DecrRef() + done <- true + }() + // Let it stall for a second. + time.Sleep(time.Second) + + select { + case <-timeout: + t.Log("Timeout triggered") + // Mark this test as successful since L0 is in memory and the + // addition of new table to L0 is supposed to stall. + + // Remove tables from level 0 so that the stalled + // compaction can make progress. This does not have any + // effect on the test. This is done so that the goroutine + // stuck on addLevel0Table can make progress and end. + db.lc.levels[0].Lock() + db.lc.levels[0].tables = nil + db.lc.levels[0].Unlock() + <-done + case <-done: + // The test completed before 5 second timeout. Mark it as successful. + t.Fatal("Test did not stall") + } }) } diff --git a/options.go b/options.go index 36d073b78..80e641376 100644 --- a/options.go +++ b/options.go @@ -129,9 +129,9 @@ func DefaultOptions(path string) Options { // table.Nothing to not preload the tables. MaxLevels: 7, MaxTableSize: 64 << 20, - NumCompactors: 2, // Compactions can be expensive. Only run 2. + NumCompactors: 2, // Run at least 2 compactors. One is dedicated for L0. NumLevelZeroTables: 5, - NumLevelZeroTablesStall: 10, + NumLevelZeroTablesStall: 15, NumMemtables: 5, BloomFalsePositive: 0.01, BlockSize: 4 * 1024, @@ -460,7 +460,7 @@ func (opt Options) WithValueLogMaxEntries(val uint32) Options { // NumCompactors sets the number of compaction workers to run concurrently. // Setting this to zero stops compactions, which could eventually cause writes to block forever. // -// The default value of NumCompactors is 2. +// The default value of NumCompactors is 2. One is dedicated just for L0. func (opt Options) WithNumCompactors(val int) Options { opt.NumCompactors = val return opt diff --git a/value_test.go b/value_test.go index f08348c9c..7a358c16b 100644 --- a/value_test.go +++ b/value_test.go @@ -489,6 +489,8 @@ func TestPersistLFDiscardStats(t *testing.T) { err = db.Close() require.NoError(t, err) + // Avoid running compactors on reopening badger. + opt.NumCompactors = 0 db, err = Open(opt) require.NoError(t, err) defer db.Close()