Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dropPrefix): add DropPrefixNonBlocking API #1698

Merged
merged 7 commits into from
May 4, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
@@ -1036,10 +1036,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {

// Iterate over the skiplist and send the entries to the publisher.
it := skl.NewIterator()
it.SeekToFirst()

var entries []*Entry
for it.Valid() {
for it.SeekToFirst(); it.Valid(); it.Next() {
v := it.Value()
e := &Entry{
Key: it.Key(),
@@ -1048,7 +1047,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
UserMeta: v.UserMeta,
}
entries = append(entries, e)
it.Next()
}
req := &request{
Entries: entries,
@@ -1836,6 +1834,85 @@ func (db *DB) dropAll() (func(), error) {
return resume, nil
}

// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would
// not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
// This operation is useful when we don't want to block writes while we delete the prefixes.
// It does this in the following way:
// - Stream the given prefixes at a given timestamp.
// - Write them to skiplist and handover that skiplist to DB.
func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error {
if db.opt.ReadOnly {
panic("Attempting to drop data in read-only mode.")
}

if len(prefixes) == 0 {
return nil
}
db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)

dropPrefix := func(prefix []byte) error {
stream := db.NewStreamAt(readTs)
stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
stream.Prefix = prefix
// Use the default implementation with some changes. We don't need anything except key.
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
a := itr.Alloc
ka := a.Copy(key)

list := &pb.KVList{}
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if item.IsDeletedOrExpired() {
break
}
if !bytes.Equal(key, item.Key()) {
// Break out on the first encounter with another key.
break
}

kv := y.NewKV(a)
kv.Key = ka
list.Kv = append(list.Kv, kv)

if db.opt.NumVersionsToKeep == 1 {
break
}

if item.DiscardEarlierVersions() {
break
}
}
return list, nil
}
stream.Send = func(buf *z.Buffer) error {
// Stream framework already batches the key values.
b := skl.NewBuilder(1 << 20) // TODO: Maybe figure out the optimal value.
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
b.Add(y.KeyWithTs(kv.Key, readTs), y.ValueStruct{Meta: bitDelete})
return nil
})
if err != nil {
return err
}
return db.HandoverSkiplist(b.Skiplist(), nil)
}

return stream.Orchestrate(context.Background())
}

// Iterate over all the prefixes and logically drop them.
for _, prefix := range prefixes {
if err := dropPrefix(prefix); err != nil {
return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
}
}
return nil
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here