Skip to content

Commit 32230b1

Browse files
committed
opt(stream): add option to directly copy over tables from lower levels (#1700)
Also takes a bug fix from PR #1712, commit 58d0674 This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs. For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels. To use this option, the following options should be set in Stream. stream.KeyToList = nil stream.ChooseKey = nil stream.SinceTs = 0 db.managedTxns = true If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes.
1 parent 77973c6 commit 32230b1

13 files changed

+696
-159
lines changed

db.go

+1
Original file line numberDiff line numberDiff line change
@@ -1996,6 +1996,7 @@ func (db *DB) StreamDB(outOptions Options) error {
19961996
// Stream contents of DB to the output DB.
19971997
stream := db.NewStreamAt(math.MaxUint64)
19981998
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
1999+
stream.FullCopy = true
19992000

20002001
stream.Send = func(buf *z.Buffer) error {
20012002
return writer.Write(buf)

iterator.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -366,17 +366,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
366366
// that the tables are sorted in the right order.
367367
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
368368
filterTables := func(tables []*table.Table) []*table.Table {
369-
if opt.SinceTs > 0 {
370-
tmp := tables[:0]
371-
for _, t := range tables {
372-
if t.MaxVersion() < opt.SinceTs {
373-
continue
374-
}
375-
tmp = append(tmp, t)
369+
if opt.SinceTs == 0 {
370+
return tables
371+
}
372+
out := tables[:0]
373+
for _, t := range tables {
374+
if t.MaxVersion() < opt.SinceTs {
375+
continue
376376
}
377-
tables = tmp
377+
out = append(out, t)
378378
}
379-
return tables
379+
return out
380380
}
381381

382382
if len(opt.Prefix) == 0 {
@@ -489,7 +489,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
489489
for i := 0; i < len(tables); i++ {
490490
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
491491
}
492-
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
492+
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
493493
res := &Iterator{
494494
txn: txn,
495495
iitr: table.NewMergeIterator(iters, opt.Reverse),

key_registry.go

+47-39
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/pkg/errors"
32+
3133
"github.com/dgraph-io/badger/v4/pb"
3234
"github.com/dgraph-io/badger/v4/y"
3335
)
@@ -264,7 +266,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
264266
// Write all the datakeys to the buf.
265267
for _, k := range reg.dataKeys {
266268
// Writing the datakey to the given buffer.
267-
if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil {
269+
if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil {
268270
return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry")
269271
}
270272
}
@@ -338,44 +340,58 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
338340
defer kr.Unlock()
339341
// Key might have generated by another go routine. So,
340342
// checking once again.
341-
key, valid = validKey()
342-
if valid {
343+
if key, valid := validKey(); valid {
343344
return key, nil
344345
}
345346
k := make([]byte, len(kr.opt.EncryptionKey))
346347
iv, err := y.GenerateIV()
347348
if err != nil {
348349
return nil, err
349350
}
350-
_, err = rand.Read(k)
351-
if err != nil {
351+
352+
if _, err := rand.Read(k); err != nil {
352353
return nil, err
353354
}
354355
// Otherwise Increment the KeyID and generate new datakey.
355356
kr.nextKeyID++
356-
dk := &pb.DataKey{
357+
dk := pb.DataKey{
357358
KeyId: kr.nextKeyID,
358359
Data: k,
359360
CreatedAt: time.Now().Unix(),
360361
Iv: iv,
361362
}
363+
kr.lastCreated = dk.CreatedAt
364+
kr.dataKeys[kr.nextKeyID] = &dk
362365
// Don't store the datakey on file if badger is running in InMemory mode.
363-
if !kr.opt.InMemory {
364-
// Store the datekey.
365-
buf := &bytes.Buffer{}
366-
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
367-
return nil, err
368-
}
369-
// Persist the datakey to the disk
370-
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
371-
return nil, err
372-
}
366+
if kr.opt.InMemory {
367+
return &dk, nil
368+
373369
}
374-
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
375-
dk.Data = k
376-
kr.lastCreated = dk.CreatedAt
377-
kr.dataKeys[kr.nextKeyID] = dk
378-
return dk, nil
370+
// Store the datekey.
371+
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
372+
return nil, err
373+
}
374+
return &dk, nil
375+
}
376+
377+
func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) {
378+
// If we don't have a encryption key, we cannot store the datakey.
379+
if len(kr.opt.EncryptionKey) == 0 {
380+
return 0, errors.New("No encryption key found. Cannot add data key")
381+
}
382+
383+
if _, ok := kr.dataKeys[dk.KeyId]; !ok {
384+
// If KeyId does not exists already, then use the next available KeyId to store data key.
385+
kr.nextKeyID++
386+
dk.KeyId = kr.nextKeyID
387+
}
388+
kr.dataKeys[dk.KeyId] = &dk
389+
390+
if kr.opt.InMemory {
391+
return dk.KeyId, nil
392+
}
393+
// Store the datakey.
394+
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
379395
}
380396

381397
// Close closes the key registry.
@@ -387,38 +403,30 @@ func (kr *KeyRegistry) Close() error {
387403
}
388404

389405
// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
390-
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
406+
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
407+
func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error {
391408
// xor will encrypt the IV and xor with the given data.
392409
// It'll used for both encryption and decryption.
393410
xor := func() error {
394411
if len(storageKey) == 0 {
395412
return nil
396413
}
397414
var err error
398-
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
415+
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
399416
return err
400417
}
401418
// In memory datakey will be plain text so encrypting before storing to the disk.
402-
var err error
403-
if err = xor(); err != nil {
419+
if err := xor(); err != nil {
404420
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
405421
}
406-
var data []byte
407-
if data, err = k.Marshal(); err != nil {
408-
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
409-
var err2 error
410-
// decrypting the datakey back.
411-
if err2 = xor(); err2 != nil {
412-
return y.Wrapf(err,
413-
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
414-
}
415-
return err
422+
data, err := key.Marshal()
423+
if err != nil {
424+
return y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
416425
}
417426
var lenCrcBuf [8]byte
418427
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
419428
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
420-
y.Check2(buf.Write(lenCrcBuf[:]))
421-
y.Check2(buf.Write(data))
422-
// Decrypting the datakey back since we're using the pointer.
423-
return xor()
429+
y.Check2(w.Write(lenCrcBuf[:]))
430+
y.Check2(w.Write(data))
431+
return nil
424432
}

level_handler.go

+31-5
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
304304
return maxVs, decr()
305305
}
306306

307-
// appendIterators appends iterators to an array of iterators, for merging.
307+
// iterators returns an array of iterators, for merging.
308308
// Note: This obtains references for the table handlers. Remember to close these iterators.
309-
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
309+
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
310310
s.RLock()
311311
defer s.RUnlock()
312312

@@ -324,14 +324,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
324324
out = append(out, t)
325325
}
326326
}
327-
return appendIteratorsReversed(iters, out, topt)
327+
return iteratorsReversed(out, topt)
328328
}
329329

330330
tables := opt.pickTables(s.tables)
331331
if len(tables) == 0 {
332-
return iters
332+
return nil
333333
}
334-
return append(iters, table.NewConcatIterator(tables, topt))
334+
return []y.Iterator{table.NewConcatIterator(tables, topt)}
335+
}
336+
337+
func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
338+
if opt.Reverse {
339+
panic("Invalid option for getTables")
340+
}
341+
342+
s.RLock()
343+
defer s.RUnlock()
344+
345+
if s.level == 0 {
346+
var out []*table.Table
347+
for _, t := range s.tables {
348+
if opt.pickTable(t) {
349+
t.IncrRef()
350+
out = append(out, t)
351+
}
352+
}
353+
return out
354+
}
355+
356+
tables := opt.pickTables(s.tables)
357+
for _, t := range tables {
358+
t.IncrRef()
359+
}
360+
return tables
335361
}
336362

337363
type levelHandlerRLocked struct{}

levels.go

+61-7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/pkg/errors"
3434
otrace "go.opencensus.io/trace"
3535

36+
"github.com/dgraph-io/badger/v4/options"
3637
"github.com/dgraph-io/badger/v4/pb"
3738
"github.com/dgraph-io/badger/v4/table"
3839
"github.com/dgraph-io/badger/v4/y"
@@ -895,7 +896,7 @@ func (s *levelsController) compactBuildTables(
895896
var iters []y.Iterator
896897
switch {
897898
case lev == 0:
898-
iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
899+
iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...)
899900
case len(topTables) > 0:
900901
y.AssertTrue(len(topTables) == 1)
901902
iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
@@ -1608,24 +1609,34 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
16081609
return maxVs, nil
16091610
}
16101611

1611-
func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
1612+
func iteratorsReversed(th []*table.Table, opt int) []y.Iterator {
1613+
out := make([]y.Iterator, 0, len(th))
16121614
for i := len(th) - 1; i >= 0; i-- {
16131615
// This will increment the reference of the table handler.
16141616
out = append(out, th[i].NewIterator(opt))
16151617
}
16161618
return out
16171619
}
16181620

1619-
// appendIterators appends iterators to an array of iterators, for merging.
1621+
// getTables return tables from all levels. It would call IncrRef on all returned tables.
1622+
func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table {
1623+
res := make([][]*table.Table, 0, len(s.levels))
1624+
for _, level := range s.levels {
1625+
res = append(res, level.getTables(opt))
1626+
}
1627+
return res
1628+
}
1629+
1630+
// iterators returns an array of iterators, for merging.
16201631
// Note: This obtains references for the table handlers. Remember to close these iterators.
1621-
func (s *levelsController) appendIterators(
1622-
iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
1632+
func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator {
16231633
// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
16241634
// data when there's a compaction.
1635+
itrs := make([]y.Iterator, 0, len(s.levels))
16251636
for _, level := range s.levels {
1626-
iters = level.appendIterators(iters, opt)
1637+
itrs = append(itrs, level.iterators(opt)...)
16271638
}
1628-
return iters
1639+
return itrs
16291640
}
16301641

16311642
// TableInfo represents the information about a table.
@@ -1752,3 +1763,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
17521763
sort.Strings(splits)
17531764
return splits
17541765
}
1766+
1767+
// AddTable builds the table from the KV.value options passed through the KV.Key.
1768+
func (lc *levelsController) AddTable(
1769+
kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error {
1770+
// TODO: Encryption / Decryption might be required for the table, if the sender and receiver
1771+
// don't have same encryption mode. See if inplace encryption/decryption can be done.
1772+
// Tables are sent in the sorted order, so no need to sort them here.
1773+
encrypted := len(lc.kv.opt.EncryptionKey) > 0
1774+
y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted))
1775+
// The keyId is zero if there is no encryption.
1776+
opts := buildTableOptions(lc.kv)
1777+
opts.Compression = options.CompressionType(change.Compression)
1778+
opts.DataKey = dk
1779+
1780+
fileID := lc.reserveFileID()
1781+
fname := table.NewFilename(fileID, lc.kv.opt.Dir)
1782+
1783+
// kv.Value is owned by the z.buffer. Ensure that we copy this buffer.
1784+
var tbl *table.Table
1785+
var err error
1786+
if lc.kv.opt.InMemory {
1787+
if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil {
1788+
return errors.Wrap(err, "while creating in-memory table from buffer")
1789+
}
1790+
} else {
1791+
if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil {
1792+
return errors.Wrap(err, "while creating table from buffer")
1793+
}
1794+
}
1795+
1796+
lc.levels[lev].addTable(tbl)
1797+
// Release the ref held by OpenTable. addTable would add a reference.
1798+
_ = tbl.DecrRef()
1799+
1800+
change.Id = fileID
1801+
change.Level = uint32(lev)
1802+
if dk != nil {
1803+
change.KeyId = dk.KeyId
1804+
}
1805+
// We use the same data KeyId. So, change.KeyId remains the same.
1806+
y.AssertTrue(change.Op == pb.ManifestChange_CREATE)
1807+
return lc.kv.manifest.addChanges([]*pb.ManifestChange{change})
1808+
}

0 commit comments

Comments
 (0)