Skip to content

Commit 2d2af1d

Browse files
author
Anthony Romano
committed
mvcc: restore into tree index with one key index
Clobbering the mvcc kvindex with new keyIndexes for each restore chunk would cause index corruption by dropping historical information.
1 parent 9304ae5 commit 2d2af1d

File tree

3 files changed

+83
-45
lines changed

3 files changed

+83
-45
lines changed

mvcc/index.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ type index interface {
2929
RangeSince(key, end []byte, rev int64) []revision
3030
Compact(rev int64) map[revision]struct{}
3131
Equal(b index) bool
32+
3233
Insert(ki *keyIndex)
34+
KeyIndex(ki *keyIndex) *keyIndex
3335
}
3436

3537
type treeIndex struct {
@@ -60,16 +62,19 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
6062

6163
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
6264
keyi := &keyIndex{key: key}
65+
if keyi = ti.KeyIndex(keyi); keyi == nil {
66+
return revision{}, revision{}, 0, ErrRevisionNotFound
67+
}
68+
return keyi.get(atRev)
69+
}
6370

71+
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
6472
ti.RLock()
6573
defer ti.RUnlock()
66-
item := ti.tree.Get(keyi)
67-
if item == nil {
68-
return revision{}, revision{}, 0, ErrRevisionNotFound
74+
if item := ti.tree.Get(keyi); item != nil {
75+
return item.(*keyIndex)
6976
}
70-
71-
keyi = item.(*keyIndex)
72-
return keyi.get(atRev)
77+
return nil
7378
}
7479

7580
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {

mvcc/kvstore.go

+66-39
Original file line numberDiff line numberDiff line change
@@ -275,23 +275,15 @@ func (s *store) restore() error {
275275
}
276276

277277
// index keys concurrently as they're loaded in from tx
278-
unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{})
279-
go func() {
280-
defer close(donec)
281-
for unordered := range unorderedc {
282-
// restore the tree index from the unordered index.
283-
for _, v := range unordered {
284-
s.kvindex.Insert(v)
285-
}
286-
}
287-
}()
278+
rkvc, revc := restoreIntoIndex(s.kvindex)
288279
for {
289280
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
290281
if len(keys) == 0 {
291282
break
292283
}
293-
// unbuffered so keys don't pile up in memory
294-
unorderedc <- s.restoreChunk(keys, vals, keyToLease)
284+
// rkvc blocks if the total pending keys exceeds the restore
285+
// chunk size to keep keys from consuming too much memory.
286+
restoreChunk(rkvc, keys, vals, keyToLease)
295287
if len(keys) < restoreChunkKeys {
296288
// partial set implies final set
297289
break
@@ -301,8 +293,8 @@ func (s *store) restore() error {
301293
newMin.sub++
302294
revToBytes(newMin, min)
303295
}
304-
close(unorderedc)
305-
<-donec
296+
close(rkvc)
297+
s.currentRev = <-revc
306298

307299
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
308300
// the correct revision should be set to compaction revision in the case, not the largest revision
@@ -334,38 +326,73 @@ func (s *store) restore() error {
334326
return nil
335327
}
336328

337-
func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex {
338-
// assume half of keys are overwrites
339-
unordered := make(map[string]*keyIndex, len(keys)/2)
329+
type revKeyValue struct {
330+
key []byte
331+
kv mvccpb.KeyValue
332+
kstr string
333+
}
334+
335+
func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
336+
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
337+
go func() {
338+
currentRev := int64(1)
339+
defer func() { revc <- currentRev }()
340+
// restore the tree index from streaming the unordered index.
341+
kiCache := make(map[string]*keyIndex, restoreChunkKeys)
342+
for rkv := range rkvc {
343+
ki, ok := kiCache[rkv.kstr]
344+
// purge cache if too many keys and missing
345+
if !ok && len(kiCache) >= restoreChunkKeys {
346+
i := 10
347+
for k := range kiCache {
348+
delete(kiCache, k)
349+
if i--; i == 0 {
350+
break
351+
}
352+
}
353+
}
354+
// cache miss, fetch from tree index if there
355+
if !ok {
356+
ki = &keyIndex{key: rkv.kv.Key}
357+
if idxKey := idx.KeyIndex(ki); idxKey != nil {
358+
kiCache[rkv.kstr], ki = idxKey, idxKey
359+
ok = true
360+
}
361+
}
362+
rev := bytesToRev(rkv.key)
363+
currentRev = rev.main
364+
if ok {
365+
if isTombstone(rkv.key) {
366+
ki.tombstone(rev.main, rev.sub)
367+
continue
368+
}
369+
ki.put(rev.main, rev.sub)
370+
} else if !isTombstone(rkv.key) {
371+
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
372+
idx.Insert(ki)
373+
kiCache[rkv.kstr] = ki
374+
}
375+
}
376+
}()
377+
return rkvc, revc
378+
}
379+
380+
func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
340381
for i, key := range keys {
341-
var kv mvccpb.KeyValue
342-
if err := kv.Unmarshal(vals[i]); err != nil {
382+
rkv := revKeyValue{key: key}
383+
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
343384
plog.Fatalf("cannot unmarshal event: %v", err)
344385
}
345-
rev := bytesToRev(key[:revBytesLen])
346-
s.currentRev = rev.main
347-
kstr := string(kv.Key)
386+
rkv.kstr = string(rkv.kv.Key)
348387
if isTombstone(key) {
349-
if ki, ok := unordered[kstr]; ok {
350-
ki.tombstone(rev.main, rev.sub)
351-
}
352-
delete(keyToLease, kstr)
353-
continue
354-
}
355-
if ki, ok := unordered[kstr]; ok {
356-
ki.put(rev.main, rev.sub)
357-
} else {
358-
ki = &keyIndex{key: kv.Key}
359-
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
360-
unordered[kstr] = ki
361-
}
362-
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
363-
keyToLease[kstr] = lid
388+
delete(keyToLease, rkv.kstr)
389+
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
390+
keyToLease[rkv.kstr] = lid
364391
} else {
365-
delete(keyToLease, kstr)
392+
delete(keyToLease, rkv.kstr)
366393
}
394+
kvc <- rkv
367395
}
368-
return unordered
369396
}
370397

371398
func (s *store) Close() error {

mvcc/kvstore_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ func TestStoreRestore(t *testing.T) {
403403
}
404404
ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
405405
wact = []testutil.Action{
406+
{"keyIndex", []interface{}{ki}},
406407
{"insert", []interface{}{ki}},
407408
}
408409
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
@@ -698,6 +699,11 @@ func (i *fakeIndex) Insert(ki *keyIndex) {
698699
i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
699700
}
700701

702+
func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex {
703+
i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}})
704+
return nil
705+
}
706+
701707
func createBytesSlice(bytesN, sliceN int) [][]byte {
702708
rs := [][]byte{}
703709
for len(rs) != sliceN {

0 commit comments

Comments
 (0)