Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress/Encrypt Blocks in background #1227

Merged
merged 36 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ee65856
Doesn't work
Jan 31, 2020
72d1932
Everything works but has race
Feb 11, 2020
ebbb23e
Everything works but super slow
Feb 11, 2020
62cda51
Everything works but slower than master
Feb 12, 2020
7230da4
Reuse encode buffer
Feb 12, 2020
277b0af
Reuse pool for decompression and []byte in encode
Feb 12, 2020
15ae690
fix benchmark
Feb 12, 2020
93963ad
performance issues. One go routine faster than multiple go routines
Feb 12, 2020
37d0c0a
Dont use go routines for no compression/encryption
Feb 21, 2020
5795be5
All table tests work
Feb 21, 2020
b86d1b1
Reduce memory consumption
Feb 21, 2020
01f304e
fix test
Feb 21, 2020
da3c4c4
Cleanup
Feb 24, 2020
269bdbd
cleanup go.mod/sum
Feb 24, 2020
1f99b13
fixup
Feb 24, 2020
c141ebf
Merge branch 'master' into ibrahim/replace-b.buf
Feb 24, 2020
bd0ae87
Use locks to protect buf copy
Mar 2, 2020
d807f05
Use locks to protect buf copy
Mar 2, 2020
dece005
Merge branch 'ibrahim/replace-b.buf' of https://github.com/dgraph-io/…
Mar 2, 2020
30a4c31
Remove unused key from bblock
Mar 2, 2020
833bae4
Fix benchmark
Mar 2, 2020
7061c81
Fix slicepool usage
Mar 2, 2020
2fecb9d
Fix slicepool usage
Mar 2, 2020
8014669
fix review comments
Mar 3, 2020
95dffd2
Use dst to store final table
Mar 3, 2020
0a506f0
Reduce size of blockchan
Mar 3, 2020
f869acc
Disable all stream writer tests
Mar 3, 2020
34af954
Revert "Disable all stream writer tests"
Mar 4, 2020
ac6d2c0
Revert "fix review comments"
Mar 4, 2020
58185c6
Merge branch 'master' into ibrahim/replace-b.buf
Mar 4, 2020
19d9717
Revert "Reduce size of blockchan"
Mar 5, 2020
d1b850f
Revert "Use dst to store final table"
Mar 5, 2020
c6cd35f
Revert "Revert "Reduce size of blockchan""
Mar 5, 2020
10e0c2b
Revert "Revert "fix review comments""
Mar 5, 2020
6f2a0fc
Use dst instead of b.buf
Mar 6, 2020
1b4a04f
Cleanup
Mar 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,16 +669,13 @@ func TestL0GCBug(t *testing.T) {
// Simulate a crash by not closing db1 but releasing the locks.
if db1.dirLockGuard != nil {
require.NoError(t, db1.dirLockGuard.release())
db1.dirLockGuard = nil
}
if db1.valueDirGuard != nil {
require.NoError(t, db1.valueDirGuard.release())
db1.valueDirGuard = nil
}
for _, f := range db1.vlog.filesMap {
require.NoError(t, f.fd.Close())
}
require.NoError(t, db1.registry.Close())
require.NoError(t, db1.lc.close())
require.NoError(t, db1.manifest.close())
require.NoError(t, db1.Close())

db2, err := Open(opts)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func DefaultOptions(path string) Options {

func buildTableOptions(opt Options) table.Options {
return table.Options{
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadingMode: opt.TableLoadingMode,
Expand Down
212 changes: 153 additions & 59 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"crypto/aes"
"math"
"runtime"
"sync"
"unsafe"

"github.com/dgryski/go-farm"
Expand All @@ -33,6 +35,11 @@ import (
"github.com/dgraph-io/ristretto/z"
)

const (
KB = 1024
MB = KB * 1024
)

func newBuffer(sz int) *bytes.Buffer {
b := new(bytes.Buffer)
b.Grow(sz)
Expand Down Expand Up @@ -61,34 +68,98 @@ func (h *header) Decode(buf []byte) {
copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize])
}

type bblock struct {
data []byte
key []byte
jarifibrahim marked this conversation as resolved.
Show resolved Hide resolved
start uint32 // Points to the starting offset of the block.
end uint32 // Points to the end offset of the block.
}

// Builder is used in building a table.
type Builder struct {
// Typically tens or hundreds of meg. This is for one single file.
buf *bytes.Buffer
buf []byte
sz int

baseKey []byte // Base key for the current block.
baseOffset uint32 // Offset for the current block.
entryOffsets []uint32 // Offsets of entries present in current block.
tableIndex *pb.TableIndex
keyHashes []uint64 // Used for building the bloomfilter.
opt *Options

// Used to concurrently compress/encrypt blocks.
wg sync.WaitGroup
blockChan chan *bblock
blockList []*bblock
}

// NewTableBuilder makes a new TableBuilder.
func NewTableBuilder(opts Options) *Builder {
return &Builder{
buf: newBuffer(1 << 20),
b := &Builder{
// Additional 2 MB to store index (approximate).
// We trim the additional space in table.Finish().
buf: make([]byte, opts.TableSize+MB*2),
tableIndex: &pb.TableIndex{},
keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls.
opt: &opts,
}

// If encryption or compression is not enabled, do not start compression/encryption goroutines
// and write directly to the buffer.
if b.opt.Compression == options.None && b.opt.DataKey == nil {
return b
}

b.blockChan = make(chan *bblock, 1000)

count := runtime.NumCPU()
b.wg.Add(count)
for i := 0; i < count; i++ {
go b.handleBlock()
}
return b
}

var slicePool = sync.Pool{New: func() interface{} { return make([]byte, 0, 100) }}

func (b *Builder) handleBlock() {
defer b.wg.Done()
for item := range b.blockChan {
// Extract the item
blockBuf := item.data[item.start:item.end]
var dst []byte
// Compress the block.
if b.opt.Compression != options.None {
var err error

dst = slicePool.Get().([]byte)
dst = dst[:0]

blockBuf, err = b.compressData(dst, blockBuf)
y.Check(err)
}
if b.shouldEncrypt() {
eBlock, err := b.encrypt(blockBuf)
y.Check(y.Wrapf(err, "Error while encrypting block in table builder."))
blockBuf = eBlock
}

// Copy over compressed/encrypted data back to the main buffer.
copy(b.buf[item.start:], blockBuf)
jarifibrahim marked this conversation as resolved.
Show resolved Hide resolved

slicePool.Put(dst)

newend := item.start + uint32(len(blockBuf))
item.end = newend
}
}

// Close closes the TableBuilder.
func (b *Builder) Close() {}

// Empty returns whether it's empty.
func (b *Builder) Empty() bool { return b.buf.Len() == 0 }
func (b *Builder) Empty() bool { return b.sz == 0 }

// keyDiff returns a suffix of newKey that is different from b.baseKey.
func (b *Builder) keyDiff(newKey []byte) []byte {
Expand Down Expand Up @@ -121,20 +192,30 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) {
}

// store current entry's offset
y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32)
b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset)
y.AssertTrue(uint32(b.sz) < math.MaxUint32)
b.entryOffsets = append(b.entryOffsets, uint32(b.sz)-b.baseOffset)

// Layout: header, diffKey, value.
b.buf.Write(h.Encode())
b.buf.Write(diffKey) // We only need to store the key difference.
b.append(h.Encode())
jarifibrahim marked this conversation as resolved.
Show resolved Hide resolved
b.append(diffKey)

b.sz += v.Encode(b.buf[b.sz:])

v.EncodeTo(b.buf)
// Size of KV on SST.
sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize())
// Total estimated size = size on SST + size on vlog (length of value pointer).
b.tableIndex.EstimatedSize += (sstSz + vpLen)
}

func (b *Builder) append(data []byte) {
copy(b.buf[b.sz:], data)
b.sz += len(data)
}

func (b *Builder) addPadding(sz int) {
b.sz += sz
}

/*
Structure of Block.
+-------------------+---------------------+--------------------+--------------+------------------+
Expand All @@ -148,41 +229,39 @@ Structure of Block.
*/
// In case the data is encrypted, the "IV" is added to the end of the block.
func (b *Builder) finishBlock() {
b.buf.Write(y.U32SliceToBytes(b.entryOffsets))
b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets))))

blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block.
b.writeChecksum(blockBuf)

// Compress the block.
if b.opt.Compression != options.None {
var err error
// TODO: Find a way to reuse buffers. Current implementation creates a
// new buffer for each compressData call.
blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:])
y.Check(err)
// Truncate already written data.
b.buf.Truncate(int(b.baseOffset))
// Write compressed data.
b.buf.Write(blockBuf)
}
if b.shouldEncrypt() {
block := b.buf.Bytes()[b.baseOffset:]
eBlock, err := b.encrypt(block)
y.Check(y.Wrapf(err, "Error while encrypting block in table builder."))
// We're rewriting the block, after encrypting.
b.buf.Truncate(int(b.baseOffset))
b.buf.Write(eBlock)
b.append(y.U32SliceToBytes(b.entryOffsets))
b.append(y.U32ToBytes(uint32(len(b.entryOffsets))))

b.writeChecksum(b.buf[b.baseOffset:b.sz])

// If compression/encryption is disabled, no need to send the block to the blockChan.
// There's nothing to be done.
if b.blockChan == nil {
b.addBlockToIndex()
return
}

// TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can
// implement padding. This might be useful while using direct I/O.
// When a block is encrypted, it's length increases. We add 200 bytes to padding to
// handle cases when block size increases.
padding := 200
b.addPadding(padding)

// Block end is the actual end of the block ignoring the padding.
block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf}
b.blockList = append(b.blockList, block)

// Add key to the block index
b.addBlockToIndex()
// Push to the block handler.
b.blockChan <- block
}

func (b *Builder) addBlockToIndex() {
blockBuf := b.buf[b.baseOffset:b.sz]
// Add key to the block index.
bo := &pb.BlockOffset{
Key: y.Copy(b.baseKey),
Offset: b.baseOffset,
Len: uint32(b.buf.Len()) - b.baseOffset,
Len: uint32(len(blockBuf)),
}
b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo)
}
Expand All @@ -200,7 +279,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool {
4 + // size of list
8 + // Sum64 in checksum proto
4) // checksum length
estimatedSize := uint32(b.buf.Len()) - b.baseOffset + uint32(6 /*header size for entry*/) +
estimatedSize := uint32(b.sz) - b.baseOffset + uint32(6 /*header size for entry*/) +
uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize

if b.shouldEncrypt() {
Expand All @@ -217,8 +296,8 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {
b.finishBlock()
// Start a new block. Initialize the block.
b.baseKey = []byte{}
y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32)
b.baseOffset = uint32(b.buf.Len())
y.AssertTrue(uint32(b.sz) < math.MaxUint32)
b.baseOffset = uint32((b.sz))
b.entryOffsets = b.entryOffsets[:0]
}
b.addHelper(key, value, uint64(valueLen))
Expand All @@ -232,7 +311,7 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {

// ReachedCapacity returns true if we... roughly (?) reached capacity?
func (b *Builder) ReachedCapacity(cap int64) bool {
blocksSize := b.buf.Len() + // length of current buffer
blocksSize := b.sz + // length of current buffer
len(b.entryOffsets)*4 + // all entry offsets size
4 + // count of all entry offsets
8 + // checksum bytes
Expand Down Expand Up @@ -266,24 +345,42 @@ func (b *Builder) Finish() []byte {

b.finishBlock() // This will never start a new block.

if b.blockChan != nil {
close(b.blockChan)
}
// Wait for block handler to finish.
b.wg.Wait()

// Fix block boundaries. This includes moving the blocks so that we
// don't have any interleaving space between them.
if len(b.blockList) > 0 {
start := uint32(0)
for i, bl := range b.blockList {
// Length of the block is start minues the end.
b.tableIndex.Offsets[i].Len = bl.end - bl.start
b.tableIndex.Offsets[i].Offset = start

// Copy over the block to the current position in the main buffer.
copy(b.buf[start:], b.buf[bl.start:bl.end])
start = b.tableIndex.Offsets[i].Offset + b.tableIndex.Offsets[i].Len
}
// Start writing to the buffer from the point until which we have valid data
b.sz = int(start)
}

index, err := proto.Marshal(b.tableIndex)
y.Check(err)

if b.shouldEncrypt() {
index, err = b.encrypt(index)
y.Check(err)
}
// Write index the file.
n, err := b.buf.Write(index)
y.Check(err)

y.AssertTrue(uint32(n) < math.MaxUint32)
// Write index size.
_, err = b.buf.Write(y.U32ToBytes(uint32(n)))
y.Check(err)
// Write index the buffer.
b.append(index)
b.append(y.U32ToBytes(uint32(len(index))))

b.writeChecksum(index)
return b.buf.Bytes()
return b.buf[:b.sz]
}

func (b *Builder) writeChecksum(data []byte) {
Expand All @@ -304,13 +401,10 @@ func (b *Builder) writeChecksum(data []byte) {
// Write checksum to the file.
chksum, err := proto.Marshal(&checksum)
y.Check(err)
n, err := b.buf.Write(chksum)
y.Check(err)
b.append(chksum)

y.AssertTrue(uint32(n) < math.MaxUint32)
// Write checksum size.
_, err = b.buf.Write(y.U32ToBytes(uint32(n)))
y.Check(err)
b.append(y.U32ToBytes(uint32(len(chksum))))
}

// DataKey returns datakey of the builder.
Expand Down Expand Up @@ -340,14 +434,14 @@ func (b *Builder) shouldEncrypt() bool {
}

// compressData compresses the given data.
func (b *Builder) compressData(data []byte) ([]byte, error) {
func (b *Builder) compressData(dst, data []byte) ([]byte, error) {
switch b.opt.Compression {
case options.None:
return data, nil
case options.Snappy:
return snappy.Encode(nil, data), nil
return snappy.Encode(dst, data), nil
case options.ZSTD:
return y.ZSTDCompress(nil, data, b.opt.ZSTDCompressionLevel)
return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel)
}
return nil, errors.New("Unsupported compression type")
}
Loading