Skip to content

Commit

Permalink
Use separate compactors for L0, L1 (hypermodeinc#1466) (hypermodeinc#…
Browse files Browse the repository at this point in the history
…1468)

Signed-off-by: thomassong <[email protected]>
  • Loading branch information
mYmNeo committed Jan 12, 2023
1 parent 1a899f0 commit 2469606
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 81 deletions.
47 changes: 28 additions & 19 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (
"sync/atomic"
"time"

"github.com/dustin/go-humanize"
"github.com/pkg/errors"
"golang.org/x/net/trace"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/skl"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)

var (
Expand Down Expand Up @@ -191,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {

// Open returns a new DB object.
func Open(opt Options) (db *DB, err error) {
// It's okay to have zero compactors which will disable all compactions but
// we cannot have just one compactor otherwise we will end up with all data
// one level 2.
if opt.NumCompactors == 1 {
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
}
opt.maxBatchSize = (15 * opt.MaxTableSize) / 100
opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)

Expand Down Expand Up @@ -423,7 +430,7 @@ func (db *DB) close() (err error) {
// Force Compact L0
// We don't need to care about cstatus since no parallel compaction is running.
if db.opt.CompactL0OnClose {
err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
switch err {
case errFillTables:
// This error only means that there might be enough tables to do a compaction. So, we
Expand Down Expand Up @@ -767,7 +774,8 @@ func (db *DB) doWrites(lc *y.Closer) {

// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
//
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*Entry) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
Expand All @@ -780,9 +788,10 @@ func (db *DB) batchSet(entries []*Entry) error {
// batchSetAsync is the asynchronous version of batchSet. It accepts a callback
// function which is called when all the sets are complete. If a request level
// error occurs, it will be passed back via the callback.
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
//
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
Expand Down Expand Up @@ -1293,7 +1302,7 @@ func (db *DB) Flatten(workers int) error {
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
errCh <- db.lc.doCompact(cp)
errCh <- db.lc.doCompact(175, cp)
}()
}
var success int
Expand Down Expand Up @@ -1454,16 +1463,16 @@ func (db *DB) dropAll() (func(), error) {
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
// and memtable flush stalls for lock, which leads to deadlock
// - Flush out all memtables, skipping over keys with the given prefix, Kp.
// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
// back after a restart.
// - Stop compaction.
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
// and memtable flush stalls for lock, which leads to deadlock
// - Flush out all memtables, skipping over keys with the given prefix, Kp.
// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
// back after a restart.
// - Stop compaction.
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
db.opt.Infof("DropPrefix Called")
f := db.prepareToDrop()
Expand Down
136 changes: 79 additions & 57 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (

"golang.org/x/net/trace"

"github.com/pkg/errors"

"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)

type levelsController struct {
Expand Down Expand Up @@ -294,7 +295,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// function in logs, and forces a compaction.
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
if err := s.doCompact(174, cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
return nil
}
Expand Down Expand Up @@ -354,11 +355,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
n := s.kv.opt.NumCompactors
lc.AddRunning(n - 1)
for i := 0; i < n; i++ {
go s.runWorker(lc)
// The worker with id=0 is dedicated to L0 and L1. This is not counted
// towards the user specified NumCompactors.
go s.runCompactor(i, lc)
}
}

func (s *levelsController) runWorker(lc *y.Closer) {
func (s *levelsController) runCompactor(id int, lc *y.Closer) {
defer lc.Done()

randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
Expand All @@ -369,20 +372,31 @@ func (s *levelsController) runWorker(lc *y.Closer) {
return
}

ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
// Can add a done channel or other stuff.
case <-ticker.C:
prios := s.pickCompactLevels()
loop:
for _, p := range prios {
if err := s.doCompact(p); err == nil {
break
} else if err == errFillTables {
if id == 0 && p.level > 1 {
// If I'm ID zero, I only compact L0 and L1.
continue
}
if id != 0 && p.level <= 1 {
// If I'm ID non-zero, I do NOT compact L0 and L1.
continue
}
err := s.doCompact(id, p)
switch err {
case nil:
break loop
case errFillTables:
// pass
} else {
default:
s.kv.opt.Warningf("While running doCompact: %v\n", err)
}
}
Expand Down Expand Up @@ -520,12 +534,11 @@ nextTable:
// that would affect the snapshot view guarantee provided by transactions.
discardTs := s.kv.orc.discardAtOrBelow()

// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
}
resultCh := make(chan newTableResult)
var newTables []*table.Table
mu := new(sync.Mutex) // Guards newTables
inflightBuilders := y.NewThrottle(5)
var vp valuePointer

var numBuilds, numVersions int
var lastKey, skipKey []byte
for it.Valid() {
Expand Down Expand Up @@ -605,64 +618,72 @@ nextTable:
}
}
numKeys++
builder.Add(it.Key(), it.Value())
if vs.Meta&bitValuePointer > 0 {
vp.Decode(vs.Value)
}
builder.Add(it.Key(), vs)
}
// It was true that it.Valid() at least once in the loop above, which means we
// called Add() at least once, and builder is not Empty().
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
numKeys, numSkips, time.Since(timeStart))
if !builder.Empty() {
numBuilds++
fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()
if builder.Empty() {
continue
}
numBuilds++
fileID := s.reserveFileID()
if err := inflightBuilders.Do(); err != nil {
// Can't return from here, until I decrRef all the tables that I built so far.
break
}

go func(builder *table.Builder, fid uint64) {
var err error

fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
defer builder.Close()
defer inflightBuilders.Done(err)

build := func(fileID uint64) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fid, s.kv.opt.Dir), true)
if err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)}
return
return nil, errors.Wrapf(err, "While opening new table: %d", fid)
}

if _, err := fd.Write(builder.Finish()); err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)}
return
return nil, errors.Wrapf(err, "Unable to write to file: %d", fid)
}

tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, nil)
newTbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, nil)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
}
}
return newTbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}

newTables := make([]*table.Table, 0, 20)
// Wait for all table builders to finish.
var firstErr error
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
firstErr = res.err
}
var tbl *table.Table
tbl, err = build(fid)
// If we couldn't build the table, return fast.
if err != nil {
return
}

mu.Lock()
newTables = append(newTables, tbl)
mu.Unlock()
}(builder, fileID)
}

if firstErr == nil {
// Wait for all table builders to finish and also for newTables accumulator to finish.
err := inflightBuilders.Finish()
if err == nil {
// Ensure created files' directory entries are visible. We don't mind the extra latency
// from not doing this ASAP after all file creation has finished because this is a
// background operation.
firstErr = syncDir(s.kv.opt.Dir)
err = syncDir(s.kv.opt.Dir)
}

if firstErr != nil {
if err != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
_ = newTables[j].DecrRef()
}
}
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
return nil, nil, errorReturn
_ = decrRefs(newTables)
return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd)
}

sort.Slice(newTables, func(i, j int) bool {
Expand Down Expand Up @@ -901,7 +922,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
var errFillTables = errors.New("Unable to fill tables")

// doCompact picks some table on level l and compacts it away to the next level.
func (s *levelsController) doCompact(p compactionPriority) error {
func (s *levelsController) doCompact(id int, p compactionPriority) error {
l := p.level
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

Expand All @@ -914,7 +935,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()

s.kv.opt.Infof("Got compaction priority: %+v", p)
s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p)

// While picking tables to be compacted, both levels' tables are expected to
// remain unchanged.
Expand All @@ -930,16 +951,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
}
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.

s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level)
s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n",
id, p, cd.thisLevel.level)
s.cstatus.toLog(cd.elog)
if err := s.runCompactDef(l, cd); err != nil {
// This compaction couldn't be done successfully.
s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd)
s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
return err
}

s.cstatus.toLog(cd.elog)
s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level)
s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
return nil
}

Expand All @@ -959,7 +981,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// Stall. Make sure all levels are healthy before we unstall.
var timeStart time.Time
{
s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(lastUnstalled))
s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(lastUnstalled))
s.cstatus.RLock()
for i := 0; i < s.kv.opt.MaxLevels; i++ {
s.elog.Printf("level=%d. Status=%s Size=%d\n",
Expand Down
5 changes: 3 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func DefaultOptions(path string) Options {
MaxTableSize: 64 << 20,
NumCompactors: 2, // Compactions can be expensive. Only run 2.
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 10,
NumLevelZeroTablesStall: 15,
NumMemtables: 5,
SyncWrites: true,
NumVersionsToKeep: 1,
Expand Down Expand Up @@ -262,7 +262,8 @@ func (opt Options) WithMaxTableSize(val int64) Options {
//
// LevelSizeMultiplier sets the ratio between the maximum sizes of contiguous levels in the LSM.
// Once a level grows to be larger than this ratio allowed, the compaction process will be
// triggered.
//
// triggered.
//
// The default value of LevelSizeMultiplier is 10.
func (opt Options) WithLevelSizeMultiplier(val int) Options {
Expand Down
9 changes: 6 additions & 3 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"testing"
"time"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
"golang.org/x/net/trace"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
)

func TestValueBasic(t *testing.T) {
Expand Down Expand Up @@ -482,6 +483,8 @@ func TestPersistLFDiscardStats(t *testing.T) {
err = db.Close()
require.NoError(t, err)

// Avoid running compactors on reopening badger.
opt.NumCompactors = 0
db, err = Open(opt)
require.NoError(t, err)
defer db.Close()
Expand Down

0 comments on commit 2469606

Please sign in to comment.