Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compaction): Use separate compactors for L0, L1 #1466

Merged
merged 7 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
go func() {
for i := 8080; i < 9080; i++ {
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil {
fmt.Println("Port busy. Trying another one...")
continue

Expand Down
10 changes: 8 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {

// Open returns a new DB object.
func Open(opt Options) (db *DB, err error) {
// It's okay to have zero compactors which will disable all compactions but
// we cannot have just one compactor otherwise we will end up with all data
// one level 2.
if opt.NumCompactors == 1 {
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
}
if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
}
Expand Down Expand Up @@ -528,7 +534,7 @@ func (db *DB) close() (err error) {
// Force Compact L0
// We don't need to care about cstatus since no parallel compaction is running.
if db.opt.CompactL0OnClose {
err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
switch err {
case errFillTables:
// This error only means that there might be enough tables to do a compaction. So, we
Expand Down Expand Up @@ -1455,7 +1461,7 @@ func (db *DB) Flatten(workers int) error {
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
errCh <- db.lc.doCompact(cp)
errCh <- db.lc.doCompact(175, cp)
}()
}
var success int
Expand Down
5 changes: 2 additions & 3 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
// Need lock as we may be deleting the first table during a level 0 compaction.
s.Lock()
defer s.Unlock()
// Return false only if L0 is in memory and number of tables is more than number of
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
// Stall (by returning false) if we are above the specified stall setting for L0.
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
return false
}

Expand Down
129 changes: 69 additions & 60 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// function in logs, and forces a compaction.
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
if err := s.doCompact(174, cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
return nil
}
Expand Down Expand Up @@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
n := s.kv.opt.NumCompactors
lc.AddRunning(n - 1)
for i := 0; i < n; i++ {
go s.runWorker(lc)
// The worker with id=0 is dedicated to L0 and L1. This is not counted
// towards the user specified NumCompactors.
go s.runCompactor(i, lc)
}
}

func (s *levelsController) runWorker(lc *y.Closer) {
func (s *levelsController) runCompactor(id int, lc *y.Closer) {
defer lc.Done()

randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
Expand All @@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
return
}

ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
Expand All @@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
prios := s.pickCompactLevels()
loop:
for _, p := range prios {
err := s.doCompact(p)
if id == 0 && p.level > 1 {
// If I'm ID zero, I only compact L0 and L1.
continue
}
if id != 0 && p.level <= 1 {
// If I'm ID non-zero, I do NOT compact L0 and L1.
continue
}
err := s.doCompact(id, p)
switch err {
case nil:
break loop
Expand Down Expand Up @@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
prios = append(prios, pri)
}
}
// We used to sort compaction priorities based on the score. But, we
// decided to compact based on the level, not the priority. So, upper
// levels (level 0, level 1, etc) always get compacted first, before the
// lower levels -- this allows us to avoid stalls.
// We should continue to sort the compaction priorities by score. Now that we have a dedicated
// compactor for L0 and L1, we don't need to sort by level here.
sort.Slice(prios, func(i, j int) bool {
return prios[i].score > prios[j].score
})
return prios
}

Expand Down Expand Up @@ -541,15 +552,13 @@ nextTable:
// that would affect the snapshot view guarantee provided by transactions.
discardTs := s.kv.orc.discardAtOrBelow()

// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
}
resultCh := make(chan newTableResult)
var numBuilds, numVersions int
var lastKey, skipKey []byte
var vp valuePointer
var newTables []*table.Table
mu := new(sync.Mutex) // Guards newTables

inflightBuilders := y.NewThrottle(5)
for it.Valid() {
timeStart := time.Now()
dk, err := s.kv.registry.latestDataKey()
Expand Down Expand Up @@ -646,19 +655,6 @@ nextTable:
// called Add() at least once, and builder is not Empty().
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
numKeys, numSkips, time.Since(timeStart))
build := func(fileID uint64) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}
tbl, err := table.OpenTable(fd, bopts)
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}
if builder.Empty() {
// Cleanup builder resources:
builder.Finish()
Expand All @@ -667,49 +663,61 @@ nextTable:
}
numBuilds++
fileID := s.reserveFileID()
if err := inflightBuilders.Do(); err != nil {
// Can't return from here, until I decrRef all the tables that I built so far.
break
}
go func(builder *table.Builder) {
defer builder.Close()
var (
tbl *table.Table
err error
)
defer inflightBuilders.Done(err)

build := func(fileID uint64) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}
tbl, err := table.OpenTable(fd, bopts)
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}

var tbl *table.Table
var err error
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
resultCh <- newTableResult{tbl, err}
}(builder)
}

newTables := make([]*table.Table, 0, 20)
// Wait for all table builders to finish.
var firstErr error
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
firstErr = res.err
}
// If we couldn't build the table, return fast.
if err != nil {
return
}

mu.Lock()
newTables = append(newTables, tbl)
mu.Unlock()
}(builder)
}

if firstErr == nil {
// Wait for all table builders to finish and also for newTables accumulator to finish.
err := inflightBuilders.Finish()
if err == nil {
// Ensure created files' directory entries are visible. We don't mind the extra latency
// from not doing this ASAP after all file creation has finished because this is a
// background operation.
firstErr = s.kv.syncDir(s.kv.opt.Dir)
err = s.kv.syncDir(s.kv.opt.Dir)
}

if firstErr != nil {
if err != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
_ = newTables[j].DecrRef()
}
}
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
return nil, nil, errorReturn
_ = decrRefs(newTables)
return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd)
}

sort.Slice(newTables, func(i, j int) bool {
Expand Down Expand Up @@ -963,7 +971,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
var errFillTables = errors.New("Unable to fill tables")

// doCompact picks some table on level l and compacts it away to the next level.
func (s *levelsController) doCompact(p compactionPriority) error {
func (s *levelsController) doCompact(id int, p compactionPriority) error {
l := p.level
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

Expand All @@ -976,7 +984,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()

s.kv.opt.Infof("Got compaction priority: %+v", p)
s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p)

// While picking tables to be compacted, both levels' tables are expected to
// remain unchanged.
Expand All @@ -992,16 +1000,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
}
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.

s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level)
s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n",
id, p, cd.thisLevel.level)
s.cstatus.toLog(cd.elog)
if err := s.runCompactDef(l, cd); err != nil {
// This compaction couldn't be done successfully.
s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd)
s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
return err
}

s.cstatus.toLog(cd.elog)
s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level)
s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
return nil
}

Expand All @@ -1025,7 +1034,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// Stall. Make sure all levels are healthy before we unstall.
var timeStart time.Time
{
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.cstatus.RLock()
for i := 0; i < s.kv.opt.MaxLevels; i++ {
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",
Expand Down
Loading