Skip to content

Commit 16131ef

Browse files
manishrjainmangalaman93
authored andcommitted
opt(stream): add option to directly copy over tables from lower levels (#1700)
This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs. For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels. To use this option, the following options should be set in Stream. stream.KeyToList = nil stream.ChooseKey = nil stream.SinceTs = 0 db.managedTxns = true If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes.
1 parent a674173 commit 16131ef

14 files changed

+702
-159
lines changed

badger/cmd/stream.go

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ func stream(cmd *cobra.Command, args []string) error {
117117
WithValueDir(so.outDir).
118118
WithNumVersionsToKeep(so.numVersions).
119119
WithCompression(options.CompressionType(so.compressionType)).
120+
WithEncryptionKey(encKey).
120121
WithReadOnly(false)
121122
err = inDB.StreamDB(outOpt)
122123

db.go

+1
Original file line numberDiff line numberDiff line change
@@ -2016,6 +2016,7 @@ func (db *DB) StreamDB(outOptions Options) error {
20162016
// Stream contents of DB to the output DB.
20172017
stream := db.NewStreamAt(math.MaxUint64)
20182018
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
2019+
stream.FullCopy = true
20192020

20202021
stream.Send = func(buf *z.Buffer) error {
20212022
return writer.Write(buf)

iterator.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -368,17 +368,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
368368
// that the tables are sorted in the right order.
369369
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
370370
filterTables := func(tables []*table.Table) []*table.Table {
371-
if opt.SinceTs > 0 {
372-
tmp := tables[:0]
373-
for _, t := range tables {
374-
if t.MaxVersion() < opt.SinceTs {
375-
continue
376-
}
377-
tmp = append(tmp, t)
371+
if opt.SinceTs == 0 {
372+
return tables
373+
}
374+
out := tables[:0]
375+
for _, t := range tables {
376+
if t.MaxVersion() < opt.SinceTs {
377+
continue
378378
}
379-
tables = tmp
379+
out = append(out, t)
380380
}
381-
return tables
381+
return out
382382
}
383383

384384
if len(opt.Prefix) == 0 {
@@ -492,7 +492,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
492492
for i := 0; i < len(tables); i++ {
493493
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
494494
}
495-
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
495+
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
496496
res := &Iterator{
497497
txn: txn,
498498
iitr: table.NewMergeIterator(iters, opt.Reverse),

key_registry.go

+46-39
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/dgraph-io/badger/v3/pb"
3232
"github.com/dgraph-io/badger/v3/y"
33+
"github.com/pkg/errors"
3334
)
3435

3536
const (
@@ -264,7 +265,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
264265
// Write all the datakeys to the buf.
265266
for _, k := range reg.dataKeys {
266267
// Writing the datakey to the given buffer.
267-
if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil {
268+
if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil {
268269
return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry")
269270
}
270271
}
@@ -338,44 +339,58 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
338339
defer kr.Unlock()
339340
// Key might have generated by another go routine. So,
340341
// checking once again.
341-
key, valid = validKey()
342-
if valid {
342+
if key, valid := validKey(); valid {
343343
return key, nil
344344
}
345345
k := make([]byte, len(kr.opt.EncryptionKey))
346346
iv, err := y.GenerateIV()
347347
if err != nil {
348348
return nil, err
349349
}
350-
_, err = rand.Read(k)
351-
if err != nil {
350+
351+
if _, err = rand.Read(k); err != nil {
352352
return nil, err
353353
}
354354
// Otherwise Increment the KeyID and generate new datakey.
355355
kr.nextKeyID++
356-
dk := &pb.DataKey{
356+
dk := pb.DataKey{
357357
KeyId: kr.nextKeyID,
358358
Data: k,
359359
CreatedAt: time.Now().Unix(),
360360
Iv: iv,
361361
}
362+
kr.lastCreated = dk.CreatedAt
363+
kr.dataKeys[kr.nextKeyID] = &dk
362364
// Don't store the datakey on file if badger is running in InMemory mode.
363-
if !kr.opt.InMemory {
364-
// Store the datekey.
365-
buf := &bytes.Buffer{}
366-
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
367-
return nil, err
368-
}
369-
// Persist the datakey to the disk
370-
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
371-
return nil, err
372-
}
365+
if kr.opt.InMemory {
366+
return &dk, nil
367+
373368
}
374-
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
375-
dk.Data = k
376-
kr.lastCreated = dk.CreatedAt
377-
kr.dataKeys[kr.nextKeyID] = dk
378-
return dk, nil
369+
// Store the datekey.
370+
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
371+
return nil, err
372+
}
373+
return &dk, nil
374+
}
375+
376+
func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) {
377+
// If we don't have a encryption key, we cannot store the datakey.
378+
if len(kr.opt.EncryptionKey) == 0 {
379+
return 0, errors.New("No encryption key found. Cannot add data key")
380+
}
381+
382+
if _, ok := kr.dataKeys[dk.KeyId]; !ok {
383+
// If KeyId does not exists already, then use the next available KeyId to store data key.
384+
kr.nextKeyID++
385+
dk.KeyId = kr.nextKeyID
386+
}
387+
kr.dataKeys[dk.KeyId] = &dk
388+
389+
if kr.opt.InMemory {
390+
return dk.KeyId, nil
391+
}
392+
// Store the datakey.
393+
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
379394
}
380395

381396
// Close closes the key registry.
@@ -387,38 +402,30 @@ func (kr *KeyRegistry) Close() error {
387402
}
388403

389404
// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
390-
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
405+
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
406+
func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error {
391407
// xor will encrypt the IV and xor with the given data.
392408
// It'll used for both encryption and decryption.
393409
xor := func() error {
394410
if len(storageKey) == 0 {
395411
return nil
396412
}
397413
var err error
398-
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
414+
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
399415
return err
400416
}
401417
// In memory datakey will be plain text so encrypting before storing to the disk.
402-
var err error
403-
if err = xor(); err != nil {
418+
if err := xor(); err != nil {
404419
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
405420
}
406-
var data []byte
407-
if data, err = k.Marshal(); err != nil {
408-
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
409-
var err2 error
410-
// decrypting the datakey back.
411-
if err2 = xor(); err2 != nil {
412-
return y.Wrapf(err,
413-
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
414-
}
415-
return err
421+
data, err := key.Marshal()
422+
if err != nil {
423+
return y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
416424
}
417425
var lenCrcBuf [8]byte
418426
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
419427
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
420-
y.Check2(buf.Write(lenCrcBuf[:]))
421-
y.Check2(buf.Write(data))
422-
// Decrypting the datakey back since we're using the pointer.
423-
return xor()
428+
y.Check2(w.Write(lenCrcBuf[:]))
429+
y.Check2(w.Write(data))
430+
return nil
424431
}

level_handler.go

+32-5
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
304304
return maxVs, decr()
305305
}
306306

307-
// appendIterators appends iterators to an array of iterators, for merging.
307+
// iterators returns an array of iterators, for merging.
308308
// Note: This obtains references for the table handlers. Remember to close these iterators.
309-
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
309+
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
310310
s.RLock()
311311
defer s.RUnlock()
312312

@@ -324,14 +324,41 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
324324
out = append(out, t)
325325
}
326326
}
327-
return appendIteratorsReversed(iters, out, topt)
327+
return iteratorsReversed(out, topt)
328328
}
329329

330330
tables := opt.pickTables(s.tables)
331331
if len(tables) == 0 {
332-
return iters
332+
return nil
333333
}
334-
return append(iters, table.NewConcatIterator(tables, topt))
334+
return []y.Iterator{table.NewConcatIterator(tables, topt)}
335+
}
336+
337+
func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
338+
if opt.Reverse {
339+
panic("Invalid option for getTables")
340+
}
341+
342+
// Typically this would only be called for the last level.
343+
s.RLock()
344+
defer s.RUnlock()
345+
346+
if s.level == 0 {
347+
var out []*table.Table
348+
for _, t := range s.tables {
349+
if opt.pickTable(t) {
350+
t.IncrRef()
351+
out = append(out, t)
352+
}
353+
}
354+
return out
355+
}
356+
357+
tables := opt.pickTables(s.tables)
358+
for _, t := range tables {
359+
t.IncrRef()
360+
}
361+
return tables
335362
}
336363

337364
type levelHandlerRLocked struct{}

levels.go

+61-7
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
otrace "go.opencensus.io/trace"
3434

35+
"github.com/dgraph-io/badger/v3/options"
3536
"github.com/dgraph-io/badger/v3/pb"
3637
"github.com/dgraph-io/badger/v3/table"
3738
"github.com/dgraph-io/badger/v3/y"
@@ -895,7 +896,7 @@ func (s *levelsController) compactBuildTables(
895896
var iters []y.Iterator
896897
switch {
897898
case lev == 0:
898-
iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
899+
iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...)
899900
case len(topTables) > 0:
900901
y.AssertTrue(len(topTables) == 1)
901902
iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
@@ -1615,24 +1616,34 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
16151616
return maxVs, nil
16161617
}
16171618

1618-
func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
1619+
func iteratorsReversed(th []*table.Table, opt int) []y.Iterator {
1620+
out := make([]y.Iterator, 0, len(th))
16191621
for i := len(th) - 1; i >= 0; i-- {
16201622
// This will increment the reference of the table handler.
16211623
out = append(out, th[i].NewIterator(opt))
16221624
}
16231625
return out
16241626
}
16251627

1626-
// appendIterators appends iterators to an array of iterators, for merging.
1628+
// getTables return tables from all levels. It would call IncrRef on all returned tables.
1629+
func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table {
1630+
res := make([][]*table.Table, 0, len(s.levels))
1631+
for _, level := range s.levels {
1632+
res = append(res, level.getTables(opt))
1633+
}
1634+
return res
1635+
}
1636+
1637+
// iterators returns an array of iterators, for merging.
16271638
// Note: This obtains references for the table handlers. Remember to close these iterators.
1628-
func (s *levelsController) appendIterators(
1629-
iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
1639+
func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator {
16301640
// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
16311641
// data when there's a compaction.
1642+
itrs := make([]y.Iterator, 0, len(s.levels))
16321643
for _, level := range s.levels {
1633-
iters = level.appendIterators(iters, opt)
1644+
itrs = append(itrs, level.iterators(opt)...)
16341645
}
1635-
return iters
1646+
return itrs
16361647
}
16371648

16381649
// TableInfo represents the information about a table.
@@ -1759,3 +1770,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
17591770
sort.Strings(splits)
17601771
return splits
17611772
}
1773+
1774+
// AddTable builds the table from the KV.value options passed through the KV.Key.
1775+
func (lc *levelsController) AddTable(
1776+
kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error {
1777+
// TODO: Encryption / Decryption might be required for the table, if the sender and receiver
1778+
// don't have same encryption mode. See if inplace encryption/decryption can be done.
1779+
// Tables are sent in the sorted order, so no need to sort them here.
1780+
encrypted := len(lc.kv.opt.EncryptionKey) > 0
1781+
y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted))
1782+
// The keyId is zero if there is no encryption.
1783+
opts := buildTableOptions(lc.kv)
1784+
opts.Compression = options.CompressionType(change.Compression)
1785+
opts.DataKey = dk
1786+
1787+
fileID := lc.reserveFileID()
1788+
fname := table.NewFilename(fileID, lc.kv.opt.Dir)
1789+
1790+
// kv.Value is owned by the z.buffer. Ensure that we copy this buffer.
1791+
var tbl *table.Table
1792+
var err error
1793+
if lc.kv.opt.InMemory {
1794+
if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil {
1795+
return errors.Wrap(err, "while creating in-memory table from buffer")
1796+
}
1797+
} else {
1798+
if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil {
1799+
return errors.Wrap(err, "while creating table from buffer")
1800+
}
1801+
}
1802+
1803+
lc.levels[lev].addTable(tbl)
1804+
// Release the ref held by OpenTable. addTable would add a reference.
1805+
_ = tbl.DecrRef()
1806+
1807+
change.Id = fileID
1808+
change.Level = uint32(lev)
1809+
if dk != nil {
1810+
change.KeyId = dk.KeyId
1811+
}
1812+
// We use the same data KeyId. So, change.KeyId remains the same.
1813+
y.AssertTrue(change.Op == pb.ManifestChange_CREATE)
1814+
return lc.kv.manifest.addChanges([]*pb.ManifestChange{change})
1815+
}

0 commit comments

Comments
 (0)