From 02164874d986dd833b91827717ad710aef4fbaf1 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sat, 17 Jun 2017 18:30:24 -0700 Subject: [PATCH 1/2] mvcc: test restore and deletes with small chunk sizes --- mvcc/kvstore.go | 6 ++--- mvcc/kvstore_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index d6def51bda3..9e75abc37ae 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -53,10 +53,10 @@ const ( markedRevBytesLen = revBytesLen + 1 markBytePosition = markedRevBytesLen - 1 markTombstone byte = 't' - - restoreChunkKeys = 10000 ) +var restoreChunkKeys = 10000 // non-const for testing + // ConsistentIndexGetter is an interface that wraps the Get method. // Consistent index is the offset of an entry in a consistent replicated log. type ConsistentIndexGetter interface { @@ -286,7 +286,7 @@ func (s *store) restore() error { } }() for { - keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys) + keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 2d85d8baffa..a2f540977cd 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -17,7 +17,9 @@ package mvcc import ( "crypto/rand" "encoding/binary" + "fmt" "math" + mrand "math/rand" "os" "reflect" "testing" @@ -408,6 +410,56 @@ func TestStoreRestore(t *testing.T) { } } +func TestRestoreDelete(t *testing.T) { + oldChunk := restoreChunkKeys + restoreChunkKeys = mrand.Intn(3) + 2 + defer func() { restoreChunkKeys = oldChunk }() + + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b, &lease.FakeLessor{}, nil) + defer os.Remove(tmpPath) + + keys := make(map[string]struct{}) + for i := 0; i < 20; i++ { + ks := fmt.Sprintf("foo-%d", i) + k := []byte(ks) + s.Put(k, []byte("bar"), lease.NoLease) + keys[ks] = struct{}{} + switch mrand.Intn(3) { + case 0: + // put random key from past via random range on map + ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1)) + s.Put([]byte(ks), []byte("baz"), lease.NoLease) + keys[ks] = struct{}{} + case 1: + // delete random key via random range on map + for k := range keys { + s.DeleteRange([]byte(k), nil) + delete(keys, k) + break + } + } + } + s.Close() + + s = NewStore(b, &lease.FakeLessor{}, nil) + defer s.Close() + for i := 0; i < 20; i++ { + ks := fmt.Sprintf("foo-%d", i) + r, err := s.Range([]byte(ks), nil, RangeOptions{}) + if err != nil { + t.Fatal(err) + } + if _, ok := keys[ks]; ok { + if len(r.KVs) == 0 { + t.Errorf("#%d: expected %q, got deleted", i, ks) + } + } else if len(r.KVs) != 0 { + t.Errorf("#%d: expected deleted, got %q", i, ks) + } + } +} + func TestRestoreContinueUnfinishedCompaction(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s0 := NewStore(b, &lease.FakeLessor{}, nil) From 51a568aa81abb61a0f57565138e2bdbd1be57617 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sat, 17 Jun 2017 19:36:07 -0700 Subject: [PATCH 2/2] mvcc: restore into tree index with one key index Clobbering the mvcc kvindex with new keyIndexes for each restore chunk would cause index corruption by dropping historical information. --- mvcc/index.go | 21 ++++++--- mvcc/kvstore.go | 105 +++++++++++++++++++++++++++---------------- mvcc/kvstore_test.go | 6 +++ 3 files changed, 88 insertions(+), 44 deletions(-) diff --git a/mvcc/index.go b/mvcc/index.go index 397098a7ba7..991289cdd5c 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -29,7 +29,9 @@ type index interface { RangeSince(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} Equal(b index) bool + Insert(ki *keyIndex) + KeyIndex(ki *keyIndex) *keyIndex } type treeIndex struct { @@ -60,18 +62,27 @@ func (ti *treeIndex) Put(key []byte, rev revision) { func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { keyi := &keyIndex{key: key} - ti.RLock() defer ti.RUnlock() - item := ti.tree.Get(keyi) - if item == nil { + if keyi = ti.keyIndex(keyi); keyi == nil { return revision{}, revision{}, 0, ErrRevisionNotFound } - - keyi = item.(*keyIndex) return keyi.get(atRev) } +func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex { + ti.RLock() + defer ti.RUnlock() + return ti.keyIndex(keyi) +} + +func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex { + if item := ti.tree.Get(keyi); item != nil { + return item.(*keyIndex) + } + return nil +} + func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) { if end == nil { rev, _, _, err := ti.Get(key, atRev) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 9e75abc37ae..d7e42d164c9 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -275,23 +275,15 @@ func (s *store) restore() error { } // index keys concurrently as they're loaded in from tx - unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{}) - go func() { - defer close(donec) - for unordered := range unorderedc { - // restore the tree index from the unordered index. - for _, v := range unordered { - s.kvindex.Insert(v) - } - } - }() + rkvc, revc := restoreIntoIndex(s.kvindex) for { keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } - // unbuffered so keys don't pile up in memory - unorderedc <- s.restoreChunk(keys, vals, keyToLease) + // rkvc blocks if the total pending keys exceeds the restore + // chunk size to keep keys from consuming too much memory. + restoreChunk(rkvc, keys, vals, keyToLease) if len(keys) < restoreChunkKeys { // partial set implies final set break @@ -301,8 +293,8 @@ func (s *store) restore() error { newMin.sub++ revToBytes(newMin, min) } - close(unorderedc) - <-donec + close(rkvc) + s.currentRev = <-revc // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision @@ -334,38 +326,73 @@ func (s *store) restore() error { return nil } -func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex { - // assume half of keys are overwrites - unordered := make(map[string]*keyIndex, len(keys)/2) +type revKeyValue struct { + key []byte + kv mvccpb.KeyValue + kstr string +} + +func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) { + rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) + go func() { + currentRev := int64(1) + defer func() { revc <- currentRev }() + // restore the tree index from streaming the unordered index. + kiCache := make(map[string]*keyIndex, restoreChunkKeys) + for rkv := range rkvc { + ki, ok := kiCache[rkv.kstr] + // purge kiCache if many keys but still missing in the cache + if !ok && len(kiCache) >= restoreChunkKeys { + i := 10 + for k := range kiCache { + delete(kiCache, k) + if i--; i == 0 { + break + } + } + } + // cache miss, fetch from tree index if there + if !ok { + ki = &keyIndex{key: rkv.kv.Key} + if idxKey := idx.KeyIndex(ki); idxKey != nil { + kiCache[rkv.kstr], ki = idxKey, idxKey + ok = true + } + } + rev := bytesToRev(rkv.key) + currentRev = rev.main + if ok { + if isTombstone(rkv.key) { + ki.tombstone(rev.main, rev.sub) + continue + } + ki.put(rev.main, rev.sub) + } else if !isTombstone(rkv.key) { + ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) + idx.Insert(ki) + kiCache[rkv.kstr] = ki + } + } + }() + return rkvc, revc +} + +func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { for i, key := range keys { - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vals[i]); err != nil { + rkv := revKeyValue{key: key} + if err := rkv.kv.Unmarshal(vals[i]); err != nil { plog.Fatalf("cannot unmarshal event: %v", err) } - rev := bytesToRev(key[:revBytesLen]) - s.currentRev = rev.main - kstr := string(kv.Key) + rkv.kstr = string(rkv.kv.Key) if isTombstone(key) { - if ki, ok := unordered[kstr]; ok { - ki.tombstone(rev.main, rev.sub) - } - delete(keyToLease, kstr) - continue - } - if ki, ok := unordered[kstr]; ok { - ki.put(rev.main, rev.sub) - } else { - ki = &keyIndex{key: kv.Key} - ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) - unordered[kstr] = ki - } - if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { - keyToLease[kstr] = lid + delete(keyToLease, rkv.kstr) + } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { + keyToLease[rkv.kstr] = lid } else { - delete(keyToLease, kstr) + delete(keyToLease, rkv.kstr) } + kvc <- rkv } - return unordered } func (s *store) Close() error { diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index a2f540977cd..6b73a9426f5 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -403,6 +403,7 @@ func TestStoreRestore(t *testing.T) { } ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens} wact = []testutil.Action{ + {"keyIndex", []interface{}{ki}}, {"insert", []interface{}{ki}}, } if g := fi.Action(); !reflect.DeepEqual(g, wact) { @@ -698,6 +699,11 @@ func (i *fakeIndex) Insert(ki *keyIndex) { i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}}) } +func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex { + i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}}) + return nil +} + func createBytesSlice(bytesN, sliceN int) [][]byte { rs := [][]byte{} for len(rs) != sliceN {