From 4c4867de4172bbed6b945dac5732f6d333003b99 Mon Sep 17 00:00:00 2001 From: Damien Tournoud Date: Mon, 22 Jun 2020 08:46:14 -0700 Subject: [PATCH] Rework DB.DropPrefix Fixes three issues with the current implementation: - It can generate compaction requests that break the invariant that bottom tables need to be consecutive (issue #1380) - It performs the same level compactions in increasing order of levels (starting from L0) which leads to old versions of keys for the prefix re-surfacing to active transactions - When you have to drop multiple prefixes, the API forces you to drop one prefix at a time and go through the whole expensive table rewriting multiple times. --- db.go | 16 +++---- levels.go | 133 ++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 92 insertions(+), 57 deletions(-) diff --git a/db.go b/db.go index 2234843f2..0de551cd9 100644 --- a/db.go +++ b/db.go @@ -974,7 +974,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { defer b.Close() var vp valuePointer for iter.SeekToFirst(); iter.Valid(); iter.Next() { - if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } vs := iter.Value() @@ -987,9 +987,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { } type flushTask struct { - mt *skl.Skiplist - vptr valuePointer - dropPrefix []byte + mt *skl.Skiplist + vptr valuePointer + dropPrefixes [][]byte } // handleFlushTask must be run serially. @@ -1618,7 +1618,7 @@ func (db *DB) dropAll() (func(), error) { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefix []byte) error { +func (db *DB) DropPrefix(prefixes ...[]byte) error { db.opt.Infof("DropPrefix Called") f, err := db.prepareToDrop() if err != nil { @@ -1638,8 +1638,8 @@ func (db *DB) DropPrefix(prefix []byte) error { task := flushTask{ mt: memtable, // Ensure that the head of value log gets persisted to disk. - vptr: db.vhead, - dropPrefix: prefix, + vptr: db.vhead, + dropPrefixes: prefixes, } db.opt.Debugf("Flushing memtable") if err := db.handleFlushTask(task); err != nil { @@ -1654,7 +1654,7 @@ func (db *DB) DropPrefix(prefix []byte) error { db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Drop prefixes from the levels. - if err := db.lc.dropPrefix(prefix); err != nil { + if err := db.lc.dropPrefixes(prefixes); err != nil { return err } db.opt.Infof("DropPrefix done") diff --git a/levels.go b/levels.go index 67908faad..3d1d56a7b 100644 --- a/levels.go +++ b/levels.go @@ -274,9 +274,19 @@ func (s *levelsController) dropTree() (int, error) { // tables who only have keys with this prefix are quickly dropped. The ones which have other keys // are run through MergeIterator and compacted to create new tables. All the mechanisms of // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. -func (s *levelsController) dropPrefix(prefix []byte) error { +func (s *levelsController) dropPrefixes(prefixes [][]byte) error { + // Internal move keys related to the given prefix should also be skipped. + for _, prefix := range prefixes { + key := make([]byte, 0, len(badgerMove)+len(prefix)) + key = append(key, badgerMove...) + key = append(key, prefix...) + prefixes = append(prefixes, key) + } + opt := s.kv.opt - for _, l := range s.levels { + for i := len(s.levels) - 1; i >= 0; i-- { + l := s.levels[i] + l.RLock() if l.level == 0 { size := len(l.tables) @@ -288,7 +298,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error { score: 1.74, // A unique number greater than 1.0 does two things. Helps identify this // function in logs, and forces a compaction. - dropPrefix: prefix, + dropPrefixes: prefixes, } if err := s.doCompact(cp); err != nil { opt.Warningf("While compacting level 0: %v", err) @@ -298,39 +308,48 @@ func (s *levelsController) dropPrefix(prefix []byte) error { continue } - var tables []*table.Table - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, prefix...) - prefixesToSkip := [][]byte{prefix, moveKeyForPrefix} - for _, table := range l.tables { - var absent bool - switch { - case hasAnyPrefixes(table.Smallest(), prefixesToSkip): - case hasAnyPrefixes(table.Biggest(), prefixesToSkip): - case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip): - default: - absent = true + // Build a list of compaction operations affecting all the prefixes we + // need to drop. We need to build operations that satisfy the invariant that + // bottom tables are consecutive. + var operations [][]*table.Table + var currentOperation []*table.Table + + nextOperation := func() { + if len(currentOperation) > 0 { + operations = append(operations, currentOperation) + currentOperation = nil } - if !absent { - tables = append(tables, table) + } + + for _, table := range l.tables { + if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { + currentOperation = append(currentOperation, table) + } else { + nextOperation() } } + nextOperation() + l.RUnlock() - if len(tables) == 0 { + + if len(operations) == 0 { continue } - cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), - thisLevel: l, - nextLevel: l, - top: []*table.Table{}, - bot: tables, - dropPrefix: prefix, - } - if err := s.runCompactDef(l.level, cd); err != nil { - opt.Warningf("While running compact def: %+v. Error: %v", cd, err) - return err + opt.Infof("Dropping prefix at level %d (%d operations)", l.level, len(operations)) + for _, operation := range operations { + cd := compactDef{ + elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), + thisLevel: l, + nextLevel: l, + top: nil, + bot: operation, + dropPrefixes: prefixes, + } + if err := s.runCompactDef(l.level, cd); err != nil { + opt.Warningf("While running compact def: %+v. Error: %v", cd, err) + return err + } } } return nil @@ -395,9 +414,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool { } type compactionPriority struct { - level int - score float64 - dropPrefix []byte + level int + score float64 + dropPrefixes [][]byte } // pickCompactLevel determines which level to compact. @@ -491,13 +510,18 @@ func (s *levelsController) compactBuildTables( // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. var valid []*table.Table + +nextTable: for _, table := range botTables { - if len(cd.dropPrefix) > 0 && - bytes.HasPrefix(table.Smallest(), cd.dropPrefix) && - bytes.HasPrefix(table.Biggest(), cd.dropPrefix) { - // All the keys in this table have the dropPrefix. So, this table does not need to be - // in the iterator and can be dropped immediately. - continue + if len(cd.dropPrefixes) > 0 { + for _, prefix := range cd.dropPrefixes { + if bytes.HasPrefix(table.Smallest(), prefix) && + bytes.HasPrefix(table.Biggest(), prefix) { + // All the keys in this table have the dropPrefix. So, this table does not need to be + // in the iterator and can be dropped immediately. + continue nextTable + } + } } valid = append(valid, table) } @@ -535,12 +559,9 @@ func (s *levelsController) compactBuildTables( bopts.BfCache = s.kv.bfCache builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, cd.dropPrefix...) - prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix} for ; it.Valid(); it.Next() { // See if we need to skip the prefix. - if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) { + if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) { numSkips++ updateStats(it.Value()) continue @@ -719,10 +740,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { return false } +func containsPrefix(smallValue, largeValue []byte, prefix []byte) bool { + if bytes.HasPrefix(smallValue, prefix) { + return true + } + if bytes.HasPrefix(largeValue, prefix) { + return true + } + if bytes.Compare(prefix, smallValue) > 0 && + bytes.Compare(prefix, largeValue) < 0 { + return true + } + + return false +} + func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { for _, prefix := range listOfPrefixes { - if bytes.Compare(prefix, smallValue) > 0 && - bytes.Compare(prefix, largeValue) < 0 { + if containsPrefix(smallValue, largeValue, prefix) { return true } } @@ -744,7 +779,7 @@ type compactDef struct { thisSize int64 - dropPrefix []byte + dropPrefixes [][]byte } func (cd *compactDef) lockLevels() { @@ -918,10 +953,10 @@ func (s *levelsController) doCompact(p compactionPriority) error { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), - thisLevel: s.levels[l], - nextLevel: s.levels[l+1], - dropPrefix: p.dropPrefix, + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), + thisLevel: s.levels[l], + nextLevel: s.levels[l+1], + dropPrefixes: p.dropPrefixes, } cd.elog.SetMaxEvents(100) defer cd.elog.Finish()