Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] opt(compactions): Improve compaction performance #1574

Merged
merged 24 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
abf52b4
Some option changes to get started with compaction
manishrjain Oct 15, 2020
38d48b2
Merge branch 'master' into mrjn/compactions
manishrjain Oct 15, 2020
5c07e45
Merge branch 'master' into mrjn/compactions
manishrjain Oct 15, 2020
dc23b42
Compiles, but gets stuck when loading.
manishrjain Oct 15, 2020
bddedf5
debug
manishrjain Oct 20, 2020
651ee86
Fix bug in compaction. Do not run compaction for the same level multi…
Oct 20, 2020
2e60bf2
Working code
manishrjain Oct 20, 2020
2cc41b5
Working sub-compactions
manishrjain Oct 21, 2020
8a041c9
work
manishrjain Oct 21, 2020
ce96292
Some more changes
manishrjain Oct 22, 2020
473e48b
Fix tests
Oct 22, 2020
af06f7e
More benchmarking
manishrjain Oct 23, 2020
be07ce1
Add comments. Revert flatten.go
manishrjain Oct 23, 2020
8a32382
Increase num compactors. Reserve one for L0. Permanently enable compa…
manishrjain Oct 23, 2020
8f564a5
Use default compression in write bench
manishrjain Oct 23, 2020
e6543c1
Make zero prefer L0 instead of only compacting L0 exclusively.
manishrjain Oct 23, 2020
a86dcaf
Make zero-th compactor run L0->L0. Don't do L0->Lbase until adjusted …
manishrjain Oct 23, 2020
c21d5bb
Create one file for L0->L0 compaction.
manishrjain Oct 24, 2020
b7c302b
Don't print
manishrjain Oct 24, 2020
86d48a8
Simplify level 0 check
manishrjain Oct 26, 2020
7ccdc49
Merge branch 'master' into mrjn/compactions
manishrjain Oct 26, 2020
88be5af
Update ristretto
manishrjain Oct 26, 2020
84805a9
Self review
manishrjain Oct 26, 2020
6401968
Some touch-ups.
manishrjain Oct 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func runTest(cmd *cobra.Command, args []string) error {
// Open DB
opts := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithMaxTableSize(4 << 20). // Force more compactions.
WithBaseTableSize(4 << 20). // Force more compactions.
WithNumLevelZeroTables(2).
WithNumMemtables(2).
// Do not GC any versions, because we need them for the disect..
Expand Down
28 changes: 16 additions & 12 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
vlogMaxEntries uint32
loadBloomsOnOpen bool
detectConflicts bool
compression bool
zstdComp bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid global variables to improve readability and reduce complexity

View Rule

showDir bool
ttlDuration string
showKeysCount bool
Expand Down Expand Up @@ -113,8 +113,8 @@ func init() {
"Load Bloom filter on DB open.")
writeBenchCmd.Flags().BoolVar(&detectConflicts, "conficts", false,
"If true, it badger will detect the conflicts")
writeBenchCmd.Flags().BoolVar(&compression, "compression", true,
"If true, badger will use ZSTD mode")
writeBenchCmd.Flags().BoolVar(&zstdComp, "zstd", false,
"If true, badger will use ZSTD mode. Otherwise, use default.")
writeBenchCmd.Flags().BoolVar(&showDir, "show-dir", false,
"If true, the report will include the directory contents")
writeBenchCmd.Flags().StringVar(&dropAllPeriod, "dropall", "0s",
Expand Down Expand Up @@ -260,12 +260,6 @@ func writeSorted(db *badger.DB, num uint64) error {
}

func writeBench(cmd *cobra.Command, args []string) error {
var cmode options.CompressionType
if compression {
cmode = options.ZSTD
} else {
cmode = options.None
}
opt := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithSyncWrites(syncWrites).
Expand All @@ -277,8 +271,10 @@ func writeBench(cmd *cobra.Command, args []string) error {
WithValueLogMaxEntries(vlogMaxEntries).
WithEncryptionKey([]byte(encryptionKey)).
WithDetectConflicts(detectConflicts).
WithCompression(cmode).
WithLoggingLevel(badger.INFO)
if zstdComp {
opt = opt.WithCompression(options.ZSTD)
}

if !showLogs {
opt = opt.WithLogger(nil)
Expand Down Expand Up @@ -314,6 +310,7 @@ func writeBench(cmd *cobra.Command, args []string) error {
}

c.SignalAndWait()
fmt.Printf(db.LevelsToString())
return err
}

Expand Down Expand Up @@ -354,11 +351,13 @@ func reportStats(c *z.Closer, db *badger.DB) {
t := time.NewTicker(time.Second)
defer t.Stop()

var count int
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
count++
if showKeysCount {
showKeysStats(db)
}
Expand Down Expand Up @@ -392,8 +391,13 @@ func reportStats(c *z.Closer, db *badger.DB) {
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
"entries written: %d, speed: %d/sec, gcSuccess: %d\n", y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate, gcSuccess)
"entries written: %d, speed: %d/sec, Memory: %s\n",
y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))
if count%10 == 0 {
fmt.Printf(db.LevelsToString())
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ func (wb *WriteBatch) Cancel() {
wb.txn.Discard()
}

// The caller of this callback must hold the lock.
func (wb *WriteBatch) callback(err error) {
// sync.WaitGroup is thread-safe, so it doesn't need to be run inside wb.Lock.
defer wb.throttle.Done(err)
if err == nil {
return
}

wb.Lock()
defer wb.Unlock()
if wb.err != nil {
return
}
Expand Down
57 changes: 45 additions & 12 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type keyRange struct {
inf bool
}

func (r keyRange) isEmpty() bool {
return len(r.left) == 0 && len(r.right) == 0 && !r.inf
}

var infRange = keyRange{inf: true}

func (r keyRange) String() string {
Expand All @@ -45,7 +49,26 @@ func (r keyRange) equals(dst keyRange) bool {
r.inf == dst.inf
}

func (r *keyRange) extend(kr keyRange) {
if r.isEmpty() {
*r = kr
}
if len(r.left) == 0 || y.CompareKeys(kr.left, r.left) < 0 {
r.left = kr.left
}
if len(r.right) == 0 || y.CompareKeys(kr.right, r.right) > 0 {
r.right = kr.right
}
if kr.inf {
r.inf = true
}
}

func (r keyRange) overlapsWith(dst keyRange) bool {
// Empty keyRange always overlaps.
if r.isEmpty() {
return true
}
if r.inf || dst.inf {
return true
}
Expand Down Expand Up @@ -127,6 +150,7 @@ func (lcs *levelCompactStatus) remove(dst keyRange) bool {
type compactStatus struct {
sync.RWMutex
levels []*levelCompactStatus
tables map[uint64]struct{}
}

func (cs *compactStatus) overlapsWith(level int, this keyRange) bool {
Expand All @@ -151,11 +175,10 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef)
cs.Lock()
defer cs.Unlock()

level := cd.thisLevel.level

y.AssertTruef(level < len(cs.levels)-1, "Got level %d. Max levels: %d", level, len(cs.levels))
thisLevel := cs.levels[level]
nextLevel := cs.levels[level+1]
tl := cd.thisLevel.level
y.AssertTruef(tl < len(cs.levels)-1, "Got level %d. Max levels: %d", tl, len(cs.levels))
thisLevel := cs.levels[cd.thisLevel.level]
nextLevel := cs.levels[cd.nextLevel.level]

if thisLevel.overlapsWith(cd.thisRange) {
return false
Expand All @@ -171,31 +194,41 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef)
thisLevel.ranges = append(thisLevel.ranges, cd.thisRange)
nextLevel.ranges = append(nextLevel.ranges, cd.nextRange)
thisLevel.delSize += cd.thisSize
for _, t := range append(cd.top, cd.bot...) {
cs.tables[t.ID()] = struct{}{}
}
return true
}

func (cs *compactStatus) delete(cd compactDef) {
cs.Lock()
defer cs.Unlock()

level := cd.thisLevel.level
y.AssertTruef(level < len(cs.levels)-1, "Got level %d. Max levels: %d", level, len(cs.levels))
tl := cd.thisLevel.level
y.AssertTruef(tl < len(cs.levels)-1, "Got level %d. Max levels: %d", tl, len(cs.levels))

thisLevel := cs.levels[level]
nextLevel := cs.levels[level+1]
thisLevel := cs.levels[cd.thisLevel.level]
nextLevel := cs.levels[cd.nextLevel.level]

thisLevel.delSize -= cd.thisSize
found := thisLevel.remove(cd.thisRange)
found = nextLevel.remove(cd.nextRange) && found
if !cd.nextRange.isEmpty() {
found = nextLevel.remove(cd.nextRange) && found
}

if !found {
this := cd.thisRange
next := cd.nextRange
fmt.Printf("Looking for: [%q, %q, %v] in this level.\n", this.left, this.right, this.inf)
fmt.Printf("Looking for: %s in this level %d.\n", this, tl)
fmt.Printf("This Level:\n%s\n", thisLevel.debug())
fmt.Println()
fmt.Printf("Looking for: [%q, %q, %v] in next level.\n", next.left, next.right, next.inf)
fmt.Printf("Looking for: %s in next level %d.\n", next, cd.nextLevel.level)
fmt.Printf("Next Level:\n%s\n", nextLevel.debug())
log.Fatal("keyRange not found")
}
for _, t := range append(cd.top, cd.bot...) {
_, ok := cs.tables[t.ID()]
y.AssertTrue(ok)
delete(cs.tables, t.ID())
}
}
Loading