Skip to content

Commit 1f99a5c

Browse files
committed
opt(stream): add option to directly copy over tables from lower levels (#1700)
This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs. For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels. To use this option, the following options should be set in Stream. stream.KeyToList = nil stream.ChooseKey = nil stream.SinceTs = 0 db.managedTxns = true If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes.
1 parent 5c374e2 commit 1f99a5c

13 files changed

+697
-157
lines changed

badger/cmd/stream.go

+1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func stream(cmd *cobra.Command, args []string) error {
118118
WithValueDir(so.outDir).
119119
WithNumVersionsToKeep(so.numVersions).
120120
WithCompression(options.CompressionType(so.compressionType)).
121+
WithEncryptionKey(encKey).
121122
WithReadOnly(false)
122123
err = inDB.StreamDB(outOpt)
123124

db.go

+1
Original file line numberDiff line numberDiff line change
@@ -2011,6 +2011,7 @@ func (db *DB) StreamDB(outOptions Options) error {
20112011
// Stream contents of DB to the output DB.
20122012
stream := db.NewStreamAt(math.MaxUint64)
20132013
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
2014+
stream.FullCopy = true
20142015

20152016
stream.Send = func(buf *z.Buffer) error {
20162017
return writer.Write(buf)

iterator.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -367,17 +367,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
367367
// that the tables are sorted in the right order.
368368
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
369369
filterTables := func(tables []*table.Table) []*table.Table {
370-
if opt.SinceTs > 0 {
371-
tmp := tables[:0]
372-
for _, t := range tables {
373-
if t.MaxVersion() < opt.SinceTs {
374-
continue
375-
}
376-
tmp = append(tmp, t)
370+
if opt.SinceTs == 0 {
371+
return tables
372+
}
373+
out := tables[:0]
374+
for _, t := range tables {
375+
if t.MaxVersion() < opt.SinceTs {
376+
continue
377377
}
378-
tables = tmp
378+
out = append(out, t)
379379
}
380-
return tables
380+
return out
381381
}
382382

383383
if len(opt.Prefix) == 0 {
@@ -491,7 +491,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
491491
for i := 0; i < len(tables); i++ {
492492
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
493493
}
494-
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
494+
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
495495
res := &Iterator{
496496
txn: txn,
497497
iitr: table.NewMergeIterator(iters, opt.Reverse),

key_registry.go

+47-39
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/pkg/errors"
32+
3133
"github.com/dgraph-io/badger/v3/pb"
3234
"github.com/dgraph-io/badger/v3/y"
3335
)
@@ -264,7 +266,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
264266
// Write all the datakeys to the buf.
265267
for _, k := range reg.dataKeys {
266268
// Writing the datakey to the given buffer.
267-
if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil {
269+
if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil {
268270
return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry")
269271
}
270272
}
@@ -338,44 +340,58 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
338340
defer kr.Unlock()
339341
// Key might have generated by another go routine. So,
340342
// checking once again.
341-
key, valid = validKey()
342-
if valid {
343+
if key, valid := validKey(); valid {
343344
return key, nil
344345
}
345346
k := make([]byte, len(kr.opt.EncryptionKey))
346347
iv, err := y.GenerateIV()
347348
if err != nil {
348349
return nil, err
349350
}
350-
_, err = rand.Read(k)
351-
if err != nil {
351+
352+
if _, err := rand.Read(k); err != nil {
352353
return nil, err
353354
}
354355
// Otherwise Increment the KeyID and generate new datakey.
355356
kr.nextKeyID++
356-
dk := &pb.DataKey{
357+
dk := pb.DataKey{
357358
KeyId: kr.nextKeyID,
358359
Data: k,
359360
CreatedAt: time.Now().Unix(),
360361
Iv: iv,
361362
}
363+
kr.lastCreated = dk.CreatedAt
364+
kr.dataKeys[kr.nextKeyID] = &dk
362365
// Don't store the datakey on file if badger is running in InMemory mode.
363-
if !kr.opt.InMemory {
364-
// Store the datekey.
365-
buf := &bytes.Buffer{}
366-
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
367-
return nil, err
368-
}
369-
// Persist the datakey to the disk
370-
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
371-
return nil, err
372-
}
366+
if kr.opt.InMemory {
367+
return &dk, nil
368+
373369
}
374-
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
375-
dk.Data = k
376-
kr.lastCreated = dk.CreatedAt
377-
kr.dataKeys[kr.nextKeyID] = dk
378-
return dk, nil
370+
// Store the datekey.
371+
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
372+
return nil, err
373+
}
374+
return &dk, nil
375+
}
376+
377+
func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) {
378+
// If we don't have a encryption key, we cannot store the datakey.
379+
if len(kr.opt.EncryptionKey) == 0 {
380+
return 0, errors.New("No encryption key found. Cannot add data key")
381+
}
382+
383+
if _, ok := kr.dataKeys[dk.KeyId]; !ok {
384+
// If KeyId does not exists already, then use the next available KeyId to store data key.
385+
kr.nextKeyID++
386+
dk.KeyId = kr.nextKeyID
387+
}
388+
kr.dataKeys[dk.KeyId] = &dk
389+
390+
if kr.opt.InMemory {
391+
return dk.KeyId, nil
392+
}
393+
// Store the datakey.
394+
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
379395
}
380396

381397
// Close closes the key registry.
@@ -387,38 +403,30 @@ func (kr *KeyRegistry) Close() error {
387403
}
388404

389405
// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
390-
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
406+
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
407+
func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error {
391408
// xor will encrypt the IV and xor with the given data.
392409
// It'll used for both encryption and decryption.
393410
xor := func() error {
394411
if len(storageKey) == 0 {
395412
return nil
396413
}
397414
var err error
398-
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
415+
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
399416
return err
400417
}
401418
// In memory datakey will be plain text so encrypting before storing to the disk.
402-
var err error
403-
if err = xor(); err != nil {
419+
if err := xor(); err != nil {
404420
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
405421
}
406-
var data []byte
407-
if data, err = k.Marshal(); err != nil {
408-
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
409-
var err2 error
410-
// decrypting the datakey back.
411-
if err2 = xor(); err2 != nil {
412-
return y.Wrapf(err,
413-
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
414-
}
415-
return err
422+
data, err := key.Marshal()
423+
if err != nil {
424+
return y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
416425
}
417426
var lenCrcBuf [8]byte
418427
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
419428
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
420-
y.Check2(buf.Write(lenCrcBuf[:]))
421-
y.Check2(buf.Write(data))
422-
// Decrypting the datakey back since we're using the pointer.
423-
return xor()
429+
y.Check2(w.Write(lenCrcBuf[:]))
430+
y.Check2(w.Write(data))
431+
return nil
424432
}

level_handler.go

+32-5
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
304304
return maxVs, decr()
305305
}
306306

307-
// appendIterators appends iterators to an array of iterators, for merging.
307+
// iterators returns an array of iterators, for merging.
308308
// Note: This obtains references for the table handlers. Remember to close these iterators.
309-
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
309+
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
310310
s.RLock()
311311
defer s.RUnlock()
312312

@@ -324,14 +324,41 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
324324
out = append(out, t)
325325
}
326326
}
327-
return appendIteratorsReversed(iters, out, topt)
327+
return iteratorsReversed(out, topt)
328328
}
329329

330330
tables := opt.pickTables(s.tables)
331331
if len(tables) == 0 {
332-
return iters
332+
return nil
333333
}
334-
return append(iters, table.NewConcatIterator(tables, topt))
334+
return []y.Iterator{table.NewConcatIterator(tables, topt)}
335+
}
336+
337+
func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
338+
if opt.Reverse {
339+
panic("Invalid option for getTables")
340+
}
341+
342+
// Typically this would only be called for the last level.
343+
s.RLock()
344+
defer s.RUnlock()
345+
346+
if s.level == 0 {
347+
var out []*table.Table
348+
for _, t := range s.tables {
349+
if opt.pickTable(t) {
350+
t.IncrRef()
351+
out = append(out, t)
352+
}
353+
}
354+
return out
355+
}
356+
357+
tables := opt.pickTables(s.tables)
358+
for _, t := range tables {
359+
t.IncrRef()
360+
}
361+
return tables
335362
}
336363

337364
type levelHandlerRLocked struct{}

levels.go

+61-7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/pkg/errors"
3434
otrace "go.opencensus.io/trace"
3535

36+
"github.com/dgraph-io/badger/v3/options"
3637
"github.com/dgraph-io/badger/v3/pb"
3738
"github.com/dgraph-io/badger/v3/table"
3839
"github.com/dgraph-io/badger/v3/y"
@@ -895,7 +896,7 @@ func (s *levelsController) compactBuildTables(
895896
var iters []y.Iterator
896897
switch {
897898
case lev == 0:
898-
iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
899+
iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...)
899900
case len(topTables) > 0:
900901
y.AssertTrue(len(topTables) == 1)
901902
iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
@@ -1609,24 +1610,34 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
16091610
return maxVs, nil
16101611
}
16111612

1612-
func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
1613+
func iteratorsReversed(th []*table.Table, opt int) []y.Iterator {
1614+
out := make([]y.Iterator, 0, len(th))
16131615
for i := len(th) - 1; i >= 0; i-- {
16141616
// This will increment the reference of the table handler.
16151617
out = append(out, th[i].NewIterator(opt))
16161618
}
16171619
return out
16181620
}
16191621

1620-
// appendIterators appends iterators to an array of iterators, for merging.
1622+
// getTables return tables from all levels. It would call IncrRef on all returned tables.
1623+
func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table {
1624+
res := make([][]*table.Table, 0, len(s.levels))
1625+
for _, level := range s.levels {
1626+
res = append(res, level.getTables(opt))
1627+
}
1628+
return res
1629+
}
1630+
1631+
// iterators returns an array of iterators, for merging.
16211632
// Note: This obtains references for the table handlers. Remember to close these iterators.
1622-
func (s *levelsController) appendIterators(
1623-
iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
1633+
func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator {
16241634
// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
16251635
// data when there's a compaction.
1636+
itrs := make([]y.Iterator, 0, len(s.levels))
16261637
for _, level := range s.levels {
1627-
iters = level.appendIterators(iters, opt)
1638+
itrs = append(itrs, level.iterators(opt)...)
16281639
}
1629-
return iters
1640+
return itrs
16301641
}
16311642

16321643
// TableInfo represents the information about a table.
@@ -1753,3 +1764,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
17531764
sort.Strings(splits)
17541765
return splits
17551766
}
1767+
1768+
// AddTable builds the table from the KV.value options passed through the KV.Key.
1769+
func (lc *levelsController) AddTable(
1770+
kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error {
1771+
// TODO: Encryption / Decryption might be required for the table, if the sender and receiver
1772+
// don't have same encryption mode. See if inplace encryption/decryption can be done.
1773+
// Tables are sent in the sorted order, so no need to sort them here.
1774+
encrypted := len(lc.kv.opt.EncryptionKey) > 0
1775+
y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted))
1776+
// The keyId is zero if there is no encryption.
1777+
opts := buildTableOptions(lc.kv)
1778+
opts.Compression = options.CompressionType(change.Compression)
1779+
opts.DataKey = dk
1780+
1781+
fileID := lc.reserveFileID()
1782+
fname := table.NewFilename(fileID, lc.kv.opt.Dir)
1783+
1784+
// kv.Value is owned by the z.buffer. Ensure that we copy this buffer.
1785+
var tbl *table.Table
1786+
var err error
1787+
if lc.kv.opt.InMemory {
1788+
if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil {
1789+
return errors.Wrap(err, "while creating in-memory table from buffer")
1790+
}
1791+
} else {
1792+
if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil {
1793+
return errors.Wrap(err, "while creating table from buffer")
1794+
}
1795+
}
1796+
1797+
lc.levels[lev].addTable(tbl)
1798+
// Release the ref held by OpenTable. addTable would add a reference.
1799+
_ = tbl.DecrRef()
1800+
1801+
change.Id = fileID
1802+
change.Level = uint32(lev)
1803+
if dk != nil {
1804+
change.KeyId = dk.KeyId
1805+
}
1806+
// We use the same data KeyId. So, change.KeyId remains the same.
1807+
y.AssertTrue(change.Op == pb.ManifestChange_CREATE)
1808+
return lc.kv.manifest.addChanges([]*pb.ManifestChange{change})
1809+
}

0 commit comments

Comments
 (0)