Skip to content

Commit 6d05358

Browse files
author
Ibrahim Jarif
authored
fix(compaction): Use separate compactors for L0, L1 (#1466) (#1468)
Related to #1459 This PR contains the following changes to compactions - Use a separate thread for compacting Level 0 and 1 and a separate one for other levels - Pick levels to compact based on score. - Stall Level 0 if compactions cannot keep up (we had added this in #1186) - Limit the number of open table builders to 5 in compactions. (cherry picked from commit 0b8eb4c)
1 parent df07404 commit 6d05358

File tree

7 files changed

+124
-122
lines changed

7 files changed

+124
-122
lines changed

badger/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func main() {
2929
go func() {
3030
for i := 8080; i < 9080; i++ {
3131
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
32-
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
32+
if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil {
3333
fmt.Println("Port busy. Trying another one...")
3434
continue
3535

db.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
192192

193193
// Open returns a new DB object.
194194
func Open(opt Options) (db *DB, err error) {
195+
// It's okay to have zero compactors which will disable all compactions but
196+
// we cannot have just one compactor otherwise we will end up with all data
197+
// one level 2.
198+
if opt.NumCompactors == 1 {
199+
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
200+
}
195201
if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
196202
return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
197203
}
@@ -528,7 +534,7 @@ func (db *DB) close() (err error) {
528534
// Force Compact L0
529535
// We don't need to care about cstatus since no parallel compaction is running.
530536
if db.opt.CompactL0OnClose {
531-
err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
537+
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
532538
switch err {
533539
case errFillTables:
534540
// This error only means that there might be enough tables to do a compaction. So, we
@@ -1455,7 +1461,7 @@ func (db *DB) Flatten(workers int) error {
14551461
errCh := make(chan error, 1)
14561462
for i := 0; i < workers; i++ {
14571463
go func() {
1458-
errCh <- db.lc.doCompact(cp)
1464+
errCh <- db.lc.doCompact(175, cp)
14591465
}()
14601466
}
14611467
var success int

level_handler.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
188188
// Need lock as we may be deleting the first table during a level 0 compaction.
189189
s.Lock()
190190
defer s.Unlock()
191-
// Return false only if L0 is in memory and number of tables is more than number of
192-
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
193-
if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
191+
// Stall (by returning false) if we are above the specified stall setting for L0.
192+
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
194193
return false
195194
}
196195

levels.go

+69-60
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
306306
// function in logs, and forces a compaction.
307307
dropPrefixes: prefixes,
308308
}
309-
if err := s.doCompact(cp); err != nil {
309+
if err := s.doCompact(174, cp); err != nil {
310310
opt.Warningf("While compacting level 0: %v", err)
311311
return nil
312312
}
@@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
366366
n := s.kv.opt.NumCompactors
367367
lc.AddRunning(n - 1)
368368
for i := 0; i < n; i++ {
369-
go s.runWorker(lc)
369+
// The worker with id=0 is dedicated to L0 and L1. This is not counted
370+
// towards the user specified NumCompactors.
371+
go s.runCompactor(i, lc)
370372
}
371373
}
372374

373-
func (s *levelsController) runWorker(lc *y.Closer) {
375+
func (s *levelsController) runCompactor(id int, lc *y.Closer) {
374376
defer lc.Done()
375377

376378
randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
@@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
381383
return
382384
}
383385

384-
ticker := time.NewTicker(time.Second)
386+
ticker := time.NewTicker(100 * time.Millisecond)
385387
defer ticker.Stop()
386388

387389
for {
@@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
391393
prios := s.pickCompactLevels()
392394
loop:
393395
for _, p := range prios {
394-
err := s.doCompact(p)
396+
if id == 0 && p.level > 1 {
397+
// If I'm ID zero, I only compact L0 and L1.
398+
continue
399+
}
400+
if id != 0 && p.level <= 1 {
401+
// If I'm ID non-zero, I do NOT compact L0 and L1.
402+
continue
403+
}
404+
err := s.doCompact(id, p)
395405
switch err {
396406
case nil:
397407
break loop
@@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
453463
prios = append(prios, pri)
454464
}
455465
}
456-
// We used to sort compaction priorities based on the score. But, we
457-
// decided to compact based on the level, not the priority. So, upper
458-
// levels (level 0, level 1, etc) always get compacted first, before the
459-
// lower levels -- this allows us to avoid stalls.
466+
// We should continue to sort the compaction priorities by score. Now that we have a dedicated
467+
// compactor for L0 and L1, we don't need to sort by level here.
468+
sort.Slice(prios, func(i, j int) bool {
469+
return prios[i].score > prios[j].score
470+
})
460471
return prios
461472
}
462473

@@ -541,15 +552,13 @@ nextTable:
541552
// that would affect the snapshot view guarantee provided by transactions.
542553
discardTs := s.kv.orc.discardAtOrBelow()
543554

544-
// Start generating new tables.
545-
type newTableResult struct {
546-
table *table.Table
547-
err error
548-
}
549-
resultCh := make(chan newTableResult)
550555
var numBuilds, numVersions int
551556
var lastKey, skipKey []byte
552557
var vp valuePointer
558+
var newTables []*table.Table
559+
mu := new(sync.Mutex) // Guards newTables
560+
561+
inflightBuilders := y.NewThrottle(5)
553562
for it.Valid() {
554563
timeStart := time.Now()
555564
dk, err := s.kv.registry.latestDataKey()
@@ -646,67 +655,66 @@ nextTable:
646655
// called Add() at least once, and builder is not Empty().
647656
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
648657
numKeys, numSkips, time.Since(timeStart))
649-
build := func(fileID uint64) (*table.Table, error) {
650-
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
651-
if err != nil {
652-
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
653-
}
654-
655-
if _, err := fd.Write(builder.Finish()); err != nil {
656-
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
657-
}
658-
tbl, err := table.OpenTable(fd, bopts)
659-
// decrRef is added below.
660-
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
661-
}
662658
if builder.Empty() {
663659
continue
664660
}
665661
numBuilds++
666662
fileID := s.reserveFileID()
663+
if err := inflightBuilders.Do(); err != nil {
664+
// Can't return from here, until I decrRef all the tables that I built so far.
665+
break
666+
}
667667
go func(builder *table.Builder) {
668668
defer builder.Close()
669-
var (
670-
tbl *table.Table
671-
err error
672-
)
669+
defer inflightBuilders.Done(err)
670+
671+
build := func(fileID uint64) (*table.Table, error) {
672+
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
673+
if err != nil {
674+
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
675+
}
676+
677+
if _, err := fd.Write(builder.Finish()); err != nil {
678+
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
679+
}
680+
tbl, err := table.OpenTable(fd, bopts)
681+
// decrRef is added below.
682+
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
683+
}
684+
685+
var tbl *table.Table
686+
var err error
673687
if s.kv.opt.InMemory {
674688
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
675689
} else {
676690
tbl, err = build(fileID)
677691
}
678-
resultCh <- newTableResult{tbl, err}
679-
}(builder)
680-
}
681692

682-
newTables := make([]*table.Table, 0, 20)
683-
// Wait for all table builders to finish.
684-
var firstErr error
685-
for x := 0; x < numBuilds; x++ {
686-
res := <-resultCh
687-
newTables = append(newTables, res.table)
688-
if firstErr == nil {
689-
firstErr = res.err
690-
}
693+
// If we couldn't build the table, return fast.
694+
if err != nil {
695+
return
696+
}
697+
698+
mu.Lock()
699+
newTables = append(newTables, tbl)
700+
mu.Unlock()
701+
}(builder)
691702
}
692703

693-
if firstErr == nil {
704+
// Wait for all table builders to finish and also for newTables accumulator to finish.
705+
err := inflightBuilders.Finish()
706+
if err == nil {
694707
// Ensure created files' directory entries are visible. We don't mind the extra latency
695708
// from not doing this ASAP after all file creation has finished because this is a
696709
// background operation.
697-
firstErr = s.kv.syncDir(s.kv.opt.Dir)
710+
err = s.kv.syncDir(s.kv.opt.Dir)
698711
}
699712

700-
if firstErr != nil {
713+
if err != nil {
701714
// An error happened. Delete all the newly created table files (by calling DecrRef
702715
// -- we're the only holders of a ref).
703-
for j := 0; j < numBuilds; j++ {
704-
if newTables[j] != nil {
705-
_ = newTables[j].DecrRef()
706-
}
707-
}
708-
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
709-
return nil, nil, errorReturn
716+
_ = decrRefs(newTables)
717+
return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd)
710718
}
711719

712720
sort.Slice(newTables, func(i, j int) bool {
@@ -960,7 +968,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
960968
var errFillTables = errors.New("Unable to fill tables")
961969

962970
// doCompact picks some table on level l and compacts it away to the next level.
963-
func (s *levelsController) doCompact(p compactionPriority) error {
971+
func (s *levelsController) doCompact(id int, p compactionPriority) error {
964972
l := p.level
965973
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.
966974

@@ -973,7 +981,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
973981
cd.elog.SetMaxEvents(100)
974982
defer cd.elog.Finish()
975983

976-
s.kv.opt.Infof("Got compaction priority: %+v", p)
984+
s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p)
977985

978986
// While picking tables to be compacted, both levels' tables are expected to
979987
// remain unchanged.
@@ -989,16 +997,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
989997
}
990998
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
991999

992-
s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level)
1000+
s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n",
1001+
id, p, cd.thisLevel.level)
9931002
s.cstatus.toLog(cd.elog)
9941003
if err := s.runCompactDef(l, cd); err != nil {
9951004
// This compaction couldn't be done successfully.
996-
s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd)
1005+
s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
9971006
return err
9981007
}
9991008

10001009
s.cstatus.toLog(cd.elog)
1001-
s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level)
1010+
s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
10021011
return nil
10031012
}
10041013

@@ -1022,7 +1031,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
10221031
// Stall. Make sure all levels are healthy before we unstall.
10231032
var timeStart time.Time
10241033
{
1025-
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
1034+
s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
10261035
s.cstatus.RLock()
10271036
for i := 0; i < s.kv.opt.MaxLevels; i++ {
10281037
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",

levels_test.go

+39-53
Original file line numberDiff line numberDiff line change
@@ -749,52 +749,6 @@ func createEmptyTable(db *DB) *table.Table {
749749
}
750750

751751
func TestL0Stall(t *testing.T) {
752-
test := func(t *testing.T, opt *Options) {
753-
runBadgerTest(t, opt, func(t *testing.T, db *DB) {
754-
db.lc.levels[0].Lock()
755-
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
756-
// zero and all new additions are expected to stall if L0 is in memory.
757-
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
758-
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
759-
}
760-
db.lc.levels[0].Unlock()
761-
762-
timeout := time.After(5 * time.Second)
763-
done := make(chan bool)
764-
765-
go func() {
766-
tab := createEmptyTable(db)
767-
require.NoError(t, db.lc.addLevel0Table(tab))
768-
tab.DecrRef()
769-
done <- true
770-
}()
771-
// Let it stall for a second.
772-
time.Sleep(time.Second)
773-
774-
select {
775-
case <-timeout:
776-
if opt.KeepL0InMemory {
777-
t.Log("Timeout triggered")
778-
// Mark this test as successful since L0 is in memory and the
779-
// addition of new table to L0 is supposed to stall.
780-
781-
// Remove tables from level 0 so that the stalled
782-
// compaction can make progress. This does not have any
783-
// effect on the test. This is done so that the goroutine
784-
// stuck on addLevel0Table can make progress and end.
785-
db.lc.levels[0].Lock()
786-
db.lc.levels[0].tables = nil
787-
db.lc.levels[0].Unlock()
788-
<-done
789-
} else {
790-
t.Fatal("Test didn't finish in time")
791-
}
792-
case <-done:
793-
// The test completed before 5 second timeout. Mark it as successful.
794-
}
795-
})
796-
}
797-
798752
opt := DefaultOptions("")
799753
// Disable all compactions.
800754
opt.NumCompactors = 0
@@ -803,13 +757,45 @@ func TestL0Stall(t *testing.T) {
803757
// Addition of new tables will stall if there are 4 or more L0 tables.
804758
opt.NumLevelZeroTablesStall = 4
805759

806-
t.Run("with KeepL0InMemory", func(t *testing.T) {
807-
opt.KeepL0InMemory = true
808-
test(t, &opt)
809-
})
810-
t.Run("with L0 on disk", func(t *testing.T) {
811-
opt.KeepL0InMemory = false
812-
test(t, &opt)
760+
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
761+
db.lc.levels[0].Lock()
762+
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
763+
// zero and all new additions are expected to stall if L0 is in memory.
764+
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
765+
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
766+
}
767+
db.lc.levels[0].Unlock()
768+
769+
timeout := time.After(5 * time.Second)
770+
done := make(chan bool)
771+
772+
go func() {
773+
tab := createEmptyTable(db)
774+
require.NoError(t, db.lc.addLevel0Table(tab))
775+
tab.DecrRef()
776+
done <- true
777+
}()
778+
// Let it stall for a second.
779+
time.Sleep(time.Second)
780+
781+
select {
782+
case <-timeout:
783+
t.Log("Timeout triggered")
784+
// Mark this test as successful since L0 is in memory and the
785+
// addition of new table to L0 is supposed to stall.
786+
787+
// Remove tables from level 0 so that the stalled
788+
// compaction can make progress. This does not have any
789+
// effect on the test. This is done so that the goroutine
790+
// stuck on addLevel0Table can make progress and end.
791+
db.lc.levels[0].Lock()
792+
db.lc.levels[0].tables = nil
793+
db.lc.levels[0].Unlock()
794+
<-done
795+
case <-done:
796+
// The test completed before 5 second timeout. Mark it as successful.
797+
t.Fatal("Test did not stall")
798+
}
813799
})
814800
}
815801

0 commit comments

Comments
 (0)