From a1bae270e3cc759675a899355bc6ecf00fe4135e Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 29 Apr 2021 18:32:54 +0530 Subject: [PATCH 1/7] add DropPrefixNonBlocking API --- db.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 09fc432cc..33f6f34fc 100644 --- a/db.go +++ b/db.go @@ -1036,10 +1036,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error { // Iterate over the skiplist and send the entries to the publisher. it := skl.NewIterator() - it.SeekToFirst() var entries []*Entry - for it.Valid() { + for it.SeekToFirst(); it.Valid(); it.Next() { v := it.Value() e := &Entry{ Key: it.Key(), @@ -1048,7 +1047,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error { UserMeta: v.UserMeta, } entries = append(entries, e) - it.Next() } req := &request{ Entries: entries, @@ -1836,6 +1834,85 @@ func (db *DB) dropAll() (func(), error) { return resume, nil } +// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would +// not be cleared from LSM tree immediately. It would be deleted eventually through compactions. +// This operation is useful when we don't want to block writes while we delete the prefixes. +// It does this in the following way: +// - Stream the given prefixes at a given timestamp. +// - Write them to skiplist and handover that skiplist to DB. +func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error { + if db.opt.ReadOnly { + panic("Attempting to drop data in read-only mode.") + } + + if len(prefixes) == 0 { + return nil + } + db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes) + + dropPrefix := func(prefix []byte) error { + stream := db.NewStreamAt(readTs) + stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix) + stream.Prefix = prefix + // Use the default implementation with some changes. We don't need anything except key. + stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) { + a := itr.Alloc + ka := a.Copy(key) + + list := &pb.KVList{} + for ; itr.Valid(); itr.Next() { + item := itr.Item() + if item.IsDeletedOrExpired() { + break + } + if !bytes.Equal(key, item.Key()) { + // Break out on the first encounter with another key. + break + } + + kv := y.NewKV(a) + kv.Key = ka + list.Kv = append(list.Kv, kv) + + if db.opt.NumVersionsToKeep == 1 { + break + } + + if item.DiscardEarlierVersions() { + break + } + } + return list, nil + } + stream.Send = func(buf *z.Buffer) error { + // Stream framework already batches the key values. + b := skl.NewBuilder(1 << 20) // TODO: Maybe figure out the optimal value. + err := buf.SliceIterate(func(s []byte) error { + var kv pb.KV + if err := kv.Unmarshal(s); err != nil { + return err + } + b.Add(y.KeyWithTs(kv.Key, readTs), y.ValueStruct{Meta: bitDelete}) + return nil + }) + if err != nil { + return err + } + return db.HandoverSkiplist(b.Skiplist(), nil) + } + + return stream.Orchestrate(context.Background()) + } + + // Iterate over all the prefixes and logically drop them. + for _, prefix := range prefixes { + if err := dropPrefix(prefix); err != nil { + return errors.Wrapf(err, "While dropping prefix: %#x", prefix) + } + } + return nil +} + // DropPrefix would drop all the keys with the provided prefix. It does this in the following way: // - Stop accepting new writes. // - Stop memtable flushes before acquiring lock. Because we're acquring lock here From fbad83a6dcca4213bc58f5054a59f26f6492f31c Mon Sep 17 00:00:00 2001 From: Rohan Prasad Date: Fri, 30 Apr 2021 14:00:19 +0530 Subject: [PATCH 2/7] Add test for Drop prefix non blocking --- db2_test.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/db2_test.go b/db2_test.go index d6c86ec6f..4f7c3cae0 100644 --- a/db2_test.go +++ b/db2_test.go @@ -31,6 +31,7 @@ import ( "regexp" "runtime" "sync" + "sync/atomic" "testing" "time" @@ -1055,3 +1056,99 @@ func TestKeyCount(t *testing.T) { require.NoError(t, stream.Orchestrate(context.Background())) require.Equal(t, N, uint64(count)) } + +func TestDropPrefixNonBlocking(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + db, err := OpenManaged(DefaultOptions(dir)) + require.NoError(t, err) + defer db.Close() + + val := []byte("value") + + // Insert key-values + write := func() { + txn := db.NewTransactionAt(1, true) + defer txn.Discard() + require.NoError(t, txn.Set([]byte("aaa"), val)) + require.NoError(t, txn.Set([]byte("aab"), val)) + require.NoError(t, txn.Set([]byte("aba"), val)) + require.NoError(t, txn.Set([]byte("aca"), val)) + require.NoError(t, txn.CommitAt(2, nil)) + } + + read := func() { + txn := db.NewTransactionAt(6, false) + iterOpts := DefaultIteratorOptions + iterOpts.Prefix = []byte("aa") + it := txn.NewIterator(iterOpts) + defer it.Close() + + cnt := 0 + for it.Rewind(); it.Valid(); it.Next() { + cnt++ + } + + require.Equal(t, 0, cnt) + } + + write() + prefixes := [][]byte{[]byte("aa")} + require.NoError(t, db.DropPrefixNonBlocking(5, prefixes...)) + read() + + // Writing again at same timestamp and verifying that vlog rewrites don't allow us to read + // these entries anyway. + write() + read() +} + +func TestDropPrefixNonBlockingNoError(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + opt := DefaultOptions(dir) + db, err := OpenManaged(opt) + require.NoError(t, err) + defer db.Close() + + clock := uint64(1) + + writer := func(db *DB, shouldFail bool, closer *z.Closer) { + val := []byte("value") + defer closer.Done() + // Insert key-values + for { + select { + case <-closer.HasBeenClosed(): + return + default: + txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true) + require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val))) + + err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil) + if shouldFail && err != nil { + require.Error(t, err, ErrBlockedWrites) + } else if !shouldFail { + require.NoError(t, err) + } + } + } + } + + closer := z.NewCloser(1) + go writer(db, true, closer) + time.Sleep(time.Millisecond * 100) + require.NoError(t, db.DropPrefix([]byte("aa"))) + closer.SignalAndWait() + + closer2 := z.NewCloser(1) + go writer(db, false, closer2) + time.Sleep(time.Millisecond * 50) + prefixes := [][]byte{[]byte("aa")} + require.NoError(t, db.DropPrefixNonBlocking(atomic.AddUint64(&clock, 1), prefixes...)) + closer2.SignalAndWait() +} From 1b0fc7c06579afb18c11a64c0cd6c53f5c4fc721 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Fri, 30 Apr 2021 18:14:40 +0530 Subject: [PATCH 3/7] fixes --- db.go | 52 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/db.go b/db.go index 33f6f34fc..beb6ed197 100644 --- a/db.go +++ b/db.go @@ -763,6 +763,8 @@ var requestPool = sync.Pool{ } func (db *DB) writeToLSM(b *request) error { + db.lock.RLock() + defer db.lock.RUnlock() for i, entry := range b.Entries { var err error if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) { @@ -1838,11 +1840,11 @@ func (db *DB) dropAll() (func(), error) { // not be cleared from LSM tree immediately. It would be deleted eventually through compactions. // This operation is useful when we don't want to block writes while we delete the prefixes. // It does this in the following way: -// - Stream the given prefixes at a given timestamp. -// - Write them to skiplist and handover that skiplist to DB. -func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error { +// - Stream the given prefixes at a given ts. +// - Write them to skiplist at the specified ts and handover that skiplist to DB. +func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error { if db.opt.ReadOnly { - panic("Attempting to drop data in read-only mode.") + return errors.New("Attempting to drop data in read-only mode.") } if len(prefixes) == 0 { @@ -1851,7 +1853,7 @@ func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error { db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes) dropPrefix := func(prefix []byte) error { - stream := db.NewStreamAt(readTs) + stream := db.NewStreamAt(ts) stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix) stream.Prefix = prefix // Use the default implementation with some changes. We don't need anything except key. @@ -1884,24 +1886,52 @@ func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error { } return list, nil } + + var wg sync.WaitGroup + builderMap := make(map[uint32]*skl.Builder) + initSize := int64(float64(db.opt.MemTableSize) * 1.1) + + handover := func(force bool) error { + for id, b := range builderMap { + sl := b.Skiplist() + if force || sl.MemSize() > db.opt.MemTableSize { + wg.Add(1) + if err := db.HandoverSkiplist(sl, wg.Done); err != nil { + return err + } + // Create a fresh builder. + builderMap[id] = skl.NewBuilder(initSize) + } + } + return nil + } + stream.Send = func(buf *z.Buffer) error { - // Stream framework already batches the key values. - b := skl.NewBuilder(1 << 20) // TODO: Maybe figure out the optimal value. err := buf.SliceIterate(func(s []byte) error { var kv pb.KV if err := kv.Unmarshal(s); err != nil { return err } - b.Add(y.KeyWithTs(kv.Key, readTs), y.ValueStruct{Meta: bitDelete}) + if _, ok := builderMap[kv.StreamId]; !ok { + builderMap[kv.StreamId] = skl.NewBuilder(initSize) + } + builderMap[kv.StreamId].Add(y.KeyWithTs(kv.Key, ts), y.ValueStruct{Meta: bitDelete}) return nil }) if err != nil { return err } - return db.HandoverSkiplist(b.Skiplist(), nil) + return handover(false) } - - return stream.Orchestrate(context.Background()) + if err := stream.Orchestrate(context.Background()); err != nil { + return err + } + // Flush the remaining skiplists if any. + if err := handover(true); err != nil { + return err + } + wg.Wait() + return nil } // Iterate over all the prefixes and logically drop them. From c63236a2530ea867c86d5dfc3cfec4a463e0c5be Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Mon, 3 May 2021 13:27:57 +0530 Subject: [PATCH 4/7] fix allocated size assertion --- table/builder.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/table/builder.go b/table/builder.go index 8322bb86f..55166cd11 100644 --- a/table/builder.go +++ b/table/builder.go @@ -153,6 +153,16 @@ func NewTableBuilder(opts Options) *Builder { return b } +func maxEncodedLen(ctype options.CompressionType, sz int) int { + switch ctype { + case options.Snappy: + return snappy.MaxEncodedLen(sz) + case options.ZSTD: + return y.ZSTDCompressBound(sz) + } + return sz +} + func (b *Builder) handleBlock() { defer b.wg.Done() @@ -175,7 +185,7 @@ func (b *Builder) handleBlock() { // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater // than allocated space that means the data from this block cannot be stored in its // existing location. - allocatedSpace := (item.end) + padding + 1 + allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1 y.AssertTrue(len(blockBuf) <= allocatedSpace) // blockBuf was allocated on allocator. So, we don't need to copy it over. From b14dfefe5a4f8b6d0ddcea5c5da5632d0083152a Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Tue, 4 May 2021 00:28:42 +0530 Subject: [PATCH 5/7] address manish's comments --- db.go | 140 ++++++++++++++++++++++++++++------------------------ db2_test.go | 11 +++-- options.go | 18 +++++++ 3 files changed, 100 insertions(+), 69 deletions(-) diff --git a/db.go b/db.go index beb6ed197..c1ecdd87f 100644 --- a/db.go +++ b/db.go @@ -1842,7 +1842,7 @@ func (db *DB) dropAll() (func(), error) { // It does this in the following way: // - Stream the given prefixes at a given ts. // - Write them to skiplist at the specified ts and handover that skiplist to DB. -func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error { +func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error { if db.opt.ReadOnly { return errors.New("Attempting to drop data in read-only mode.") } @@ -1852,86 +1852,84 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error { } db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes) + cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking") + defer cbuf.Release() + + var wg sync.WaitGroup + handover := func(force bool) error { + if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize { + return nil + } + + var kvs []*pb.KV + err := cbuf.SliceIterate(func(slice []byte) error { + var kv pb.KV + if err := kv.Unmarshal(slice); err != nil { + return err + } + kvs = append(kvs, &kv) + return nil + }) + if err != nil { + return err + } + cbuf.Reset() + + // TODO: Maybe move the rest of it to a separate go routine, if it is slow. + // Sort the kvs, add them to the builder, and hand it over to DB. + b := skl.NewBuilder(db.opt.MemTableSize) + sort.Slice(kvs, func(i, j int) bool { + return bytes.Compare(kvs[i].Key, kvs[j].Key) < 0 + }) + for _, kv := range kvs { + b.Add(y.KeyWithTs(kv.Key, kv.Version+1), y.ValueStruct{Meta: bitDelete}) + } + wg.Add(1) + return db.HandoverSkiplist(b.Skiplist(), wg.Done) + } + dropPrefix := func(prefix []byte) error { - stream := db.NewStreamAt(ts) + stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix) stream.Prefix = prefix - // Use the default implementation with some changes. We don't need anything except key. + // We don't need anything except key and version. stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) { + if !itr.Valid() { + return nil, nil + } + item := itr.Item() + if item.IsDeletedOrExpired() { + return nil, nil + } + if !bytes.Equal(key, item.Key()) { + // Return on the encounter with another key. + return nil, nil + } + a := itr.Alloc ka := a.Copy(key) - list := &pb.KVList{} - for ; itr.Valid(); itr.Next() { - item := itr.Item() - if item.IsDeletedOrExpired() { - break - } - if !bytes.Equal(key, item.Key()) { - // Break out on the first encounter with another key. - break - } - - kv := y.NewKV(a) - kv.Key = ka - list.Kv = append(list.Kv, kv) - - if db.opt.NumVersionsToKeep == 1 { - break - } - - if item.DiscardEarlierVersions() { - break - } - } + // We need to generate only a single delete marker per key. All the versions for this + // key will be considered deleted, if we delete the one at highest version. + kv := y.NewKV(a) + kv.Key = ka + kv.Version = item.Version() + list.Kv = append(list.Kv, kv) + itr.Next() return list, nil } - var wg sync.WaitGroup - builderMap := make(map[uint32]*skl.Builder) - initSize := int64(float64(db.opt.MemTableSize) * 1.1) - - handover := func(force bool) error { - for id, b := range builderMap { - sl := b.Skiplist() - if force || sl.MemSize() > db.opt.MemTableSize { - wg.Add(1) - if err := db.HandoverSkiplist(sl, wg.Done); err != nil { - return err - } - // Create a fresh builder. - builderMap[id] = skl.NewBuilder(initSize) - } - } - return nil - } - stream.Send = func(buf *z.Buffer) error { - err := buf.SliceIterate(func(s []byte) error { - var kv pb.KV - if err := kv.Unmarshal(s); err != nil { - return err - } - if _, ok := builderMap[kv.StreamId]; !ok { - builderMap[kv.StreamId] = skl.NewBuilder(initSize) - } - builderMap[kv.StreamId].Add(y.KeyWithTs(kv.Key, ts), y.ValueStruct{Meta: bitDelete}) - return nil - }) - if err != nil { - return err - } + sz := buf.LenNoPadding() + dst := cbuf.Allocate(sz) + y.AssertTrue(sz == copy(dst, buf.Bytes())) return handover(false) } if err := stream.Orchestrate(context.Background()); err != nil { return err } // Flush the remaining skiplists if any. - if err := handover(true); err != nil { - return err - } - wg.Wait() - return nil + return handover(true) } // Iterate over all the prefixes and logically drop them. @@ -1940,9 +1938,21 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error { return errors.Wrapf(err, "While dropping prefix: %#x", prefix) } } + + wg.Wait() return nil } +// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops +// the prefixes by blocking the writes or doing a logical drop. +// See DropPrefixBlocking and DropPrefixNonBlocking for more information. +func (db *DB) DropPrefix(prefixes ...[]byte) error { + if db.opt.BlockWritesOnDrop { + return db.DropPrefixBlocking(prefixes...) + } + return db.DropPrefixNonBlocking(prefixes...) +} + // DropPrefix would drop all the keys with the provided prefix. It does this in the following way: // - Stop accepting new writes. // - Stop memtable flushes before acquiring lock. Because we're acquring lock here @@ -1954,7 +1964,7 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefixes ...[]byte) error { +func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error { if len(prefixes) == 0 { return nil } diff --git a/db2_test.go b/db2_test.go index 4f7c3cae0..a814a211e 100644 --- a/db2_test.go +++ b/db2_test.go @@ -1062,7 +1062,7 @@ func TestDropPrefixNonBlocking(t *testing.T) { require.NoError(t, err) defer removeDir(dir) - db, err := OpenManaged(DefaultOptions(dir)) + db, err := OpenManaged(DefaultOptions(dir).WithBlockWritesOnDrop(false)) require.NoError(t, err) defer db.Close() @@ -1081,6 +1081,7 @@ func TestDropPrefixNonBlocking(t *testing.T) { read := func() { txn := db.NewTransactionAt(6, false) + defer txn.Discard() iterOpts := DefaultIteratorOptions iterOpts.Prefix = []byte("aa") it := txn.NewIterator(iterOpts) @@ -1088,6 +1089,7 @@ func TestDropPrefixNonBlocking(t *testing.T) { cnt := 0 for it.Rewind(); it.Valid(); it.Next() { + fmt.Printf("%+v", it.Item()) cnt++ } @@ -1096,7 +1098,7 @@ func TestDropPrefixNonBlocking(t *testing.T) { write() prefixes := [][]byte{[]byte("aa")} - require.NoError(t, db.DropPrefixNonBlocking(5, prefixes...)) + require.NoError(t, db.DropPrefix(prefixes...)) read() // Writing again at same timestamp and verifying that vlog rewrites don't allow us to read @@ -1135,6 +1137,7 @@ func TestDropPrefixNonBlockingNoError(t *testing.T) { } else if !shouldFail { require.NoError(t, err) } + txn.Discard() } } } @@ -1142,13 +1145,13 @@ func TestDropPrefixNonBlockingNoError(t *testing.T) { closer := z.NewCloser(1) go writer(db, true, closer) time.Sleep(time.Millisecond * 100) - require.NoError(t, db.DropPrefix([]byte("aa"))) + require.NoError(t, db.DropPrefixBlocking([]byte("aa"))) closer.SignalAndWait() closer2 := z.NewCloser(1) go writer(db, false, closer2) time.Sleep(time.Millisecond * 50) prefixes := [][]byte{[]byte("aa")} - require.NoError(t, db.DropPrefixNonBlocking(atomic.AddUint64(&clock, 1), prefixes...)) + require.NoError(t, db.DropPrefixNonBlocking(prefixes...)) closer2.SignalAndWait() } diff --git a/options.go b/options.go index 3f9ba3395..019e639f4 100644 --- a/options.go +++ b/options.go @@ -104,6 +104,9 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode + // BlockWritesOnDrop determines whether the DropPrefix will be blocking/non-blocking. + BlockWritesOnDrop bool + // DetectConflicts determines whether the transactions would be checked for // conflicts. The transactions can be processed at a higher rate when // conflict detection is disabled. @@ -140,6 +143,7 @@ func DefaultOptions(path string) Options { MaxLevels: 7, NumGoroutines: 8, MetricsEnabled: true, + BlockWritesOnDrop: true, NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, @@ -674,6 +678,20 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat return opt } +// WithDropMode returns a new Options value with DropMode set to the given value. +// +// BlockWritesOnDrop indicates whether the call to DropPrefix should block the writes or not. +// When set to false, the DropPrefix will do a logical delete and will not block +// the writes. Although, this will not immediately clear up the LSM tree. +// When set to false, the DropPrefix will block the writes and will clear up the LSM +// tree. +// +// The default value of BlockWritesOnDrop is true. +func (opt Options) WithBlockWritesOnDrop(b bool) Options { + opt.BlockWritesOnDrop = b + return opt +} + // WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value. // // This value specifies how much data cache should hold in memory. A small size From 74e15938c385420df2f3c4bd7b29bf7b5dd7ded7 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Tue, 4 May 2021 23:26:05 +0530 Subject: [PATCH 6/7] dont use go memory --- db.go | 45 ++++++++++++++++++++++----------------------- db2_test.go | 7 +------ go.mod | 2 +- go.sum | 6 ++++-- options.go | 16 ++++++++-------- 5 files changed, 36 insertions(+), 40 deletions(-) diff --git a/db.go b/db.go index c1ecdd87f..e05e22288 100644 --- a/db.go +++ b/db.go @@ -1861,29 +1861,20 @@ func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error { return nil } - var kvs []*pb.KV - err := cbuf.SliceIterate(func(slice []byte) error { - var kv pb.KV - if err := kv.Unmarshal(slice); err != nil { - return err - } - kvs = append(kvs, &kv) + // Sort the kvs, add them to the builder, and hand it over to DB. + cbuf.SortSlice(func(left, right []byte) bool { + return y.CompareKeys(left, right) < 0 + }) + + b := skl.NewBuilder(db.opt.MemTableSize) + err := cbuf.SliceIterate(func(s []byte) error { + b.Add(s, y.ValueStruct{Meta: bitDelete}) return nil }) if err != nil { return err } cbuf.Reset() - - // TODO: Maybe move the rest of it to a separate go routine, if it is slow. - // Sort the kvs, add them to the builder, and hand it over to DB. - b := skl.NewBuilder(db.opt.MemTableSize) - sort.Slice(kvs, func(i, j int) bool { - return bytes.Compare(kvs[i].Key, kvs[j].Key) < 0 - }) - for _, kv := range kvs { - b.Add(y.KeyWithTs(kv.Key, kv.Version+1), y.ValueStruct{Meta: bitDelete}) - } wg.Add(1) return db.HandoverSkiplist(b.Skiplist(), wg.Done) } @@ -1912,17 +1903,25 @@ func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error { // We need to generate only a single delete marker per key. All the versions for this // key will be considered deleted, if we delete the one at highest version. kv := y.NewKV(a) - kv.Key = ka - kv.Version = item.Version() + kv.Key = y.KeyWithTs(ka, item.Version()) list.Kv = append(list.Kv, kv) itr.Next() return list, nil } stream.Send = func(buf *z.Buffer) error { - sz := buf.LenNoPadding() - dst := cbuf.Allocate(sz) - y.AssertTrue(sz == copy(dst, buf.Bytes())) + kv := pb.KV{} + err := buf.SliceIterate(func(s []byte) error { + kv.Reset() + if err := kv.Unmarshal(s); err != nil { + return err + } + cbuf.WriteSlice(kv.Key) + return nil + }) + if err != nil { + return err + } return handover(false) } if err := stream.Orchestrate(context.Background()); err != nil { @@ -1947,7 +1946,7 @@ func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error { // the prefixes by blocking the writes or doing a logical drop. // See DropPrefixBlocking and DropPrefixNonBlocking for more information. func (db *DB) DropPrefix(prefixes ...[]byte) error { - if db.opt.BlockWritesOnDrop { + if db.opt.AllowStopTheWorld { return db.DropPrefixBlocking(prefixes...) } return db.DropPrefixNonBlocking(prefixes...) diff --git a/db2_test.go b/db2_test.go index a814a211e..f92bc0963 100644 --- a/db2_test.go +++ b/db2_test.go @@ -1062,7 +1062,7 @@ func TestDropPrefixNonBlocking(t *testing.T) { require.NoError(t, err) defer removeDir(dir) - db, err := OpenManaged(DefaultOptions(dir).WithBlockWritesOnDrop(false)) + db, err := OpenManaged(DefaultOptions(dir).WithAllowStopTheWorld(false)) require.NoError(t, err) defer db.Close() @@ -1100,11 +1100,6 @@ func TestDropPrefixNonBlocking(t *testing.T) { prefixes := [][]byte{[]byte("aa")} require.NoError(t, db.DropPrefix(prefixes...)) read() - - // Writing again at same timestamp and verifying that vlog rewrites don't allow us to read - // these entries anyway. - write() - read() } func TestDropPrefixNonBlockingNoError(t *testing.T) { diff --git a/go.mod b/go.mod index d94c4067e..648637012 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a + github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index f14b4a123..fea436500 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -15,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg= -github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= +github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034 h1:WPhpbABd68W7GVsDEH2TlQOfd/2PQs9pczxp12oUiIw= +github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/options.go b/options.go index 019e639f4..0f6c6def7 100644 --- a/options.go +++ b/options.go @@ -104,8 +104,8 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode - // BlockWritesOnDrop determines whether the DropPrefix will be blocking/non-blocking. - BlockWritesOnDrop bool + // AllowStopTheWorld determines whether the DropPrefix will be blocking/non-blocking. + AllowStopTheWorld bool // DetectConflicts determines whether the transactions would be checked for // conflicts. The transactions can be processed at a higher rate when @@ -143,7 +143,7 @@ func DefaultOptions(path string) Options { MaxLevels: 7, NumGoroutines: 8, MetricsEnabled: true, - BlockWritesOnDrop: true, + AllowStopTheWorld: true, NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, @@ -678,17 +678,17 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat return opt } -// WithDropMode returns a new Options value with DropMode set to the given value. +// WithAllowStopTheWorld returns a new Options value with AllowStopTheWorld set to the given value. // -// BlockWritesOnDrop indicates whether the call to DropPrefix should block the writes or not. +// AllowStopTheWorld indicates whether the call to DropPrefix should block the writes or not. // When set to false, the DropPrefix will do a logical delete and will not block // the writes. Although, this will not immediately clear up the LSM tree. // When set to false, the DropPrefix will block the writes and will clear up the LSM // tree. // -// The default value of BlockWritesOnDrop is true. -func (opt Options) WithBlockWritesOnDrop(b bool) Options { - opt.BlockWritesOnDrop = b +// The default value of AllowStopTheWorld is true. +func (opt Options) WithAllowStopTheWorld(b bool) Options { + opt.AllowStopTheWorld = b return opt } From 54cc168cac2a67404c39f6845a8c8b48e4d09eb8 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Wed, 5 May 2021 00:44:37 +0530 Subject: [PATCH 7/7] update ristretto --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 648637012..0e2a361e8 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034 + github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index fea436500..e97ba7656 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034 h1:WPhpbABd68W7GVsDEH2TlQOfd/2PQs9pczxp12oUiIw= -github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= +github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0= +github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=