Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dropPrefix): add DropPrefixNonBlocking API #1698

Merged
merged 7 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 120 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ var requestPool = sync.Pool{
}

func (db *DB) writeToLSM(b *request) error {
db.lock.RLock()
defer db.lock.RUnlock()
for i, entry := range b.Entries {
var err error
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
Expand Down Expand Up @@ -1036,10 +1038,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {

// Iterate over the skiplist and send the entries to the publisher.
it := skl.NewIterator()
it.SeekToFirst()

var entries []*Entry
for it.Valid() {
for it.SeekToFirst(); it.Valid(); it.Next() {
v := it.Value()
e := &Entry{
Key: it.Key(),
Expand All @@ -1048,7 +1049,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
UserMeta: v.UserMeta,
}
entries = append(entries, e)
it.Next()
}
req := &request{
Entries: entries,
Expand Down Expand Up @@ -1836,6 +1836,122 @@ func (db *DB) dropAll() (func(), error) {
return resume, nil
}

// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would
// not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
// This operation is useful when we don't want to block writes while we delete the prefixes.
// It does this in the following way:
// - Stream the given prefixes at a given ts.
// - Write them to skiplist at the specified ts and handover that skiplist to DB.
func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
if db.opt.ReadOnly {
return errors.New("Attempting to drop data in read-only mode.")
}

if len(prefixes) == 0 {
return nil
}
db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)

cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking")
defer cbuf.Release()

var wg sync.WaitGroup
handover := func(force bool) error {
if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize {
return nil
}

// Sort the kvs, add them to the builder, and hand it over to DB.
cbuf.SortSlice(func(left, right []byte) bool {
return y.CompareKeys(left, right) < 0
})

b := skl.NewBuilder(db.opt.MemTableSize)
err := cbuf.SliceIterate(func(s []byte) error {
b.Add(s, y.ValueStruct{Meta: bitDelete})
return nil
})
if err != nil {
return err
}
cbuf.Reset()
wg.Add(1)
return db.HandoverSkiplist(b.Skiplist(), wg.Done)
}

dropPrefix := func(prefix []byte) error {
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
stream.Prefix = prefix
// We don't need anything except key and version.
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
if !itr.Valid() {
return nil, nil
}
item := itr.Item()
if item.IsDeletedOrExpired() {
return nil, nil
}
if !bytes.Equal(key, item.Key()) {
// Return on the encounter with another key.
return nil, nil
}

a := itr.Alloc
ka := a.Copy(key)
list := &pb.KVList{}
// We need to generate only a single delete marker per key. All the versions for this
// key will be considered deleted, if we delete the one at highest version.
kv := y.NewKV(a)
kv.Key = y.KeyWithTs(ka, item.Version())
list.Kv = append(list.Kv, kv)
itr.Next()
return list, nil
}

stream.Send = func(buf *z.Buffer) error {
kv := pb.KV{}
err := buf.SliceIterate(func(s []byte) error {
kv.Reset()
if err := kv.Unmarshal(s); err != nil {
return err
}
cbuf.WriteSlice(kv.Key)
return nil
})
if err != nil {
return err
}
return handover(false)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
// Flush the remaining skiplists if any.
return handover(true)
}

// Iterate over all the prefixes and logically drop them.
for _, prefix := range prefixes {
if err := dropPrefix(prefix); err != nil {
return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
}
}

wg.Wait()
return nil
}

// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops
// the prefixes by blocking the writes or doing a logical drop.
// See DropPrefixBlocking and DropPrefixNonBlocking for more information.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
if db.opt.AllowStopTheWorld {
return db.DropPrefixBlocking(prefixes...)
}
return db.DropPrefixNonBlocking(prefixes...)
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
Expand All @@ -1847,7 +1963,7 @@ func (db *DB) dropAll() (func(), error) {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error {
if len(prefixes) == 0 {
return nil
}
Expand Down
95 changes: 95 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"regexp"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1055,3 +1056,97 @@ func TestKeyCount(t *testing.T) {
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}

func TestDropPrefixNonBlocking(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir).WithAllowStopTheWorld(false))
require.NoError(t, err)
defer db.Close()

val := []byte("value")

// Insert key-values
write := func() {
txn := db.NewTransactionAt(1, true)
defer txn.Discard()
require.NoError(t, txn.Set([]byte("aaa"), val))
require.NoError(t, txn.Set([]byte("aab"), val))
require.NoError(t, txn.Set([]byte("aba"), val))
require.NoError(t, txn.Set([]byte("aca"), val))
require.NoError(t, txn.CommitAt(2, nil))
}

read := func() {
txn := db.NewTransactionAt(6, false)
defer txn.Discard()
iterOpts := DefaultIteratorOptions
iterOpts.Prefix = []byte("aa")
it := txn.NewIterator(iterOpts)
defer it.Close()

cnt := 0
for it.Rewind(); it.Valid(); it.Next() {
fmt.Printf("%+v", it.Item())
cnt++
}

require.Equal(t, 0, cnt)
}

write()
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefix(prefixes...))
read()
}

func TestDropPrefixNonBlockingNoError(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

opt := DefaultOptions(dir)
db, err := OpenManaged(opt)
require.NoError(t, err)
defer db.Close()

clock := uint64(1)

writer := func(db *DB, shouldFail bool, closer *z.Closer) {
val := []byte("value")
defer closer.Done()
// Insert key-values
for {
select {
case <-closer.HasBeenClosed():
return
default:
txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true)
require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val)))

err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil)
if shouldFail && err != nil {
require.Error(t, err, ErrBlockedWrites)
} else if !shouldFail {
require.NoError(t, err)
}
txn.Discard()
}
}
}

closer := z.NewCloser(1)
go writer(db, true, closer)
time.Sleep(time.Millisecond * 100)
require.NoError(t, db.DropPrefixBlocking([]byte("aa")))
closer.SignalAndWait()

closer2 := z.NewCloser(1)
go writer(db, false, closer2)
time.Sleep(time.Millisecond * 50)
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefixNonBlocking(prefixes...))
closer2.SignalAndWait()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3
github.com/dustin/go-humanize v1.0.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.3.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
Expand All @@ -15,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
18 changes: 18 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Options struct {
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode

// AllowStopTheWorld determines whether the DropPrefix will be blocking/non-blocking.
AllowStopTheWorld bool

// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
Expand Down Expand Up @@ -140,6 +143,7 @@ func DefaultOptions(path string) Options {
MaxLevels: 7,
NumGoroutines: 8,
MetricsEnabled: true,
AllowStopTheWorld: true,

NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
NumLevelZeroTables: 5,
Expand Down Expand Up @@ -674,6 +678,20 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat
return opt
}

// WithAllowStopTheWorld returns a new Options value with AllowStopTheWorld set to the given value.
//
// AllowStopTheWorld indicates whether the call to DropPrefix should block the writes or not.
// When set to false, the DropPrefix will do a logical delete and will not block
// the writes. Although, this will not immediately clear up the LSM tree.
// When set to false, the DropPrefix will block the writes and will clear up the LSM
// tree.
//
// The default value of AllowStopTheWorld is true.
func (opt Options) WithAllowStopTheWorld(b bool) Options {
opt.AllowStopTheWorld = b
return opt
}

// WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value.
//
// This value specifies how much data cache should hold in memory. A small size
Expand Down
12 changes: 11 additions & 1 deletion table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

func maxEncodedLen(ctype options.CompressionType, sz int) int {
switch ctype {
case options.Snappy:
return snappy.MaxEncodedLen(sz)
case options.ZSTD:
return y.ZSTDCompressBound(sz)
}
return sz
}

func (b *Builder) handleBlock() {
defer b.wg.Done()

Expand All @@ -175,7 +185,7 @@ func (b *Builder) handleBlock() {
// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
// than allocated space that means the data from this block cannot be stored in its
// existing location.
allocatedSpace := (item.end) + padding + 1
allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
y.AssertTrue(len(blockBuf) <= allocatedSpace)

// blockBuf was allocated on allocator. So, we don't need to copy it over.
Expand Down