From ee6585655b8fdd725529ce99924c6c3465cddeb9 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 31 Jan 2020 17:21:08 +0530 Subject: [PATCH 01/33] Doesn't work --- table/builder.go | 65 +++++++++++++++++++------------------------ table/builder_test.go | 7 +++-- table/table.go | 5 ++++ 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/table/builder.go b/table/builder.go index 26519494b..ba26789a5 100644 --- a/table/builder.go +++ b/table/builder.go @@ -64,7 +64,7 @@ func (h *header) Decode(buf []byte) { // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. - buf *bytes.Buffer + buf []byte baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -77,7 +77,7 @@ type Builder struct { // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { return &Builder{ - buf: newBuffer(1 << 20), + buf: make([]byte, 0, 1<<20), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, @@ -88,7 +88,7 @@ func NewTableBuilder(opts Options) *Builder { func (b *Builder) Close() {} // Empty returns whether it's empty. -func (b *Builder) Empty() bool { return b.buf.Len() == 0 } +func (b *Builder) Empty() bool { return len(b.buf) == 0 } // keyDiff returns a suffix of newKey that is different from b.baseKey. func (b *Builder) keyDiff(newKey []byte) []byte { @@ -121,14 +121,16 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // store current entry's offset - y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset) + y.AssertTrue(uint32(len(b.buf)) < math.MaxUint32) + b.entryOffsets = append(b.entryOffsets, uint32(len(b.buf))-b.baseOffset) // Layout: header, diffKey, value. - b.buf.Write(h.Encode()) - b.buf.Write(diffKey) // We only need to store the key difference. + b.buf = append(b.buf, h.Encode()...) + b.buf = append(b.buf, diffKey...) - v.EncodeTo(b.buf) + bb := &bytes.Buffer{} + v.EncodeTo(bb) + b.buf = append(b.buf, bb.Bytes()...) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) // Total estimated size = size on SST + size on vlog (length of value pointer). @@ -148,10 +150,10 @@ Structure of Block. */ // In case the data is encrypted, the "IV" is added to the end of the block. func (b *Builder) finishBlock() { - b.buf.Write(y.U32SliceToBytes(b.entryOffsets)) - b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets)))) + b.buf = append(b.buf, y.U32SliceToBytes(b.entryOffsets)...) + b.buf = append(b.buf, y.U32ToBytes(uint32(len(b.entryOffsets)))...) - blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. + blockBuf := b.buf[b.baseOffset:] // Store checksum for current block. b.writeChecksum(blockBuf) // Compress the block. @@ -159,22 +161,18 @@ func (b *Builder) finishBlock() { var err error // TODO: Find a way to reuse buffers. Current implementation creates a // new buffer for each compressData call. - blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:]) + blockBuf, err = b.compressData(b.buf[b.baseOffset:]) y.Check(err) - // Truncate already written data. - b.buf.Truncate(int(b.baseOffset)) - // Write compressed data. - b.buf.Write(blockBuf) } if b.shouldEncrypt() { - block := b.buf.Bytes()[b.baseOffset:] + block := b.buf[b.baseOffset:] eBlock, err := b.encrypt(block) y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) - // We're rewriting the block, after encrypting. - b.buf.Truncate(int(b.baseOffset)) - b.buf.Write(eBlock) + blockBuf = eBlock } + b.buf = append(b.buf[b.baseOffset:], blockBuf...) + // TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can // implement padding. This might be useful while using direct I/O. @@ -182,7 +180,7 @@ func (b *Builder) finishBlock() { bo := &pb.BlockOffset{ Key: y.Copy(b.baseKey), Offset: b.baseOffset, - Len: uint32(b.buf.Len()) - b.baseOffset, + Len: uint32(len(blockBuf)), } b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) } @@ -200,7 +198,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { 4 + // size of list 8 + // Sum64 in checksum proto 4) // checksum length - estimatedSize := uint32(b.buf.Len()) - b.baseOffset + uint32(6 /*header size for entry*/) + + estimatedSize := uint32(len(b.buf)) - b.baseOffset + uint32(6 /*header size for entry*/) + uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize if b.shouldEncrypt() { @@ -217,8 +215,8 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { b.finishBlock() // Start a new block. Initialize the block. b.baseKey = []byte{} - y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) - b.baseOffset = uint32(b.buf.Len()) + y.AssertTrue(uint32(len(b.buf)) < math.MaxUint32) + b.baseOffset = uint32(len(b.buf)) b.entryOffsets = b.entryOffsets[:0] } b.addHelper(key, value, uint64(valueLen)) @@ -232,7 +230,7 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - blocksSize := b.buf.Len() + // length of current buffer + blocksSize := len(b.buf) + // length of current buffer len(b.entryOffsets)*4 + // all entry offsets size 4 + // count of all entry offsets 8 + // checksum bytes @@ -273,17 +271,13 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } + b.buf = append(b.buf, index...) // Write index the file. - n, err := b.buf.Write(index) - y.Check(err) - y.AssertTrue(uint32(n) < math.MaxUint32) - // Write index size. - _, err = b.buf.Write(y.U32ToBytes(uint32(n))) - y.Check(err) + b.buf = append(b.buf, y.U32ToBytes(uint32(len(index)))...) b.writeChecksum(index) - return b.buf.Bytes() + return b.buf } func (b *Builder) writeChecksum(data []byte) { @@ -304,13 +298,10 @@ func (b *Builder) writeChecksum(data []byte) { // Write checksum to the file. chksum, err := proto.Marshal(&checksum) y.Check(err) - n, err := b.buf.Write(chksum) - y.Check(err) + b.buf = append(b.buf, chksum...) - y.AssertTrue(uint32(n) < math.MaxUint32) // Write checksum size. - _, err = b.buf.Write(y.U32ToBytes(uint32(n))) - y.Check(err) + b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) } // DataKey returns datakey of the builder. diff --git a/table/builder_test.go b/table/builder_test.go index 9650a9275..2c50a45cc 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -54,7 +54,7 @@ func TestTableIndex(t *testing.T) { // Compression mode. opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD}) - keysCount := 10000 + keysCount := 2 for _, opt := range opts { builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) @@ -65,7 +65,9 @@ func TestTableIndex(t *testing.T) { blockCount := 0 for i := 0; i < keysCount; i++ { k := []byte(fmt.Sprintf("%016x", i)) - v := fmt.Sprintf("%d", i) + v := make([]byte, 4<<10) + rand.Read(v) + //v := fmt.Sprintf("%d", i) vs := y.ValueStruct{Value: []byte(v)} if i == 0 { // This is first key for first block. blockFirstKeys = append(blockFirstKeys, k) @@ -80,6 +82,7 @@ func TestTableIndex(t *testing.T) { require.NoError(t, err, "unable to write to file") tbl, err := OpenTable(f, opt) + require.NoError(t, err) if opt.DataKey == nil { // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) diff --git a/table/table.go b/table/table.go index 25227be34..7c2d60539 100644 --- a/table/table.go +++ b/table/table.go @@ -431,15 +431,20 @@ func (t *Table) block(idx int) (*block, error) { "corrupted or the table options are incorrectly set") } + fmt.Println("readpos", readPos) // Read checksum and store it readPos -= blk.chkLen blk.checksum = blk.data[readPos : readPos+blk.chkLen] // Move back and read numEntries in the block. readPos -= 4 + fmt.Println("readpos", readPos) numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4])) + fmt.Println("num entries", numEntries) entriesIndexStart := readPos - (numEntries * 4) + fmt.Println("readpos", entriesIndexStart) entriesIndexEnd := entriesIndexStart + numEntries*4 + fmt.Println("readpos", entriesIndexEnd, entriesIndexStart) blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd]) blk.entriesIndexStart = entriesIndexStart From 72d1932570535eafb46127038f2bc114b54f0031 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 11 Feb 2020 18:45:24 +0530 Subject: [PATCH 02/33] Everything works but has race --- go.mod | 2 + go.sum | 2 + options.go | 1 + table/builder.go | 117 +++++++++++++++++++++++++++++++++--------- table/builder_test.go | 61 ++++++++++++---------- table/iterator.go | 2 +- table/table.go | 10 ++-- table/table_test.go | 2 +- 8 files changed, 139 insertions(+), 58 deletions(-) diff --git a/go.mod b/go.mod index eae04e485..8f0bed624 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,13 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 + github.com/davecgh/go-spew v1.1.1 github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 + github.com/google/uuid v1.1.1 github.com/kr/pretty v0.1.0 // indirect github.com/pkg/errors v0.8.1 github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 4c71dbdf4..d189d4baa 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/options.go b/options.go index 4fbe09199..0e3c60e3f 100644 --- a/options.go +++ b/options.go @@ -155,6 +155,7 @@ func DefaultOptions(path string) Options { func buildTableOptions(opt Options) table.Options { return table.Options{ + TableSize: uint64(opt.MaxTableSize), BlockSize: opt.BlockSize, BloomFalsePositive: opt.BloomFalsePositive, LoadingMode: opt.TableLoadingMode, diff --git a/table/builder.go b/table/builder.go index ba26789a5..2bb078f1d 100644 --- a/table/builder.go +++ b/table/builder.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/aes" "math" + "sync" "unsafe" "github.com/dgryski/go-farm" @@ -61,6 +62,15 @@ func (h *header) Decode(buf []byte) { copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize]) } +type bblock struct { + data []byte + key []byte + idx int + start uint32 + end uint32 + done bool +} + // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. @@ -72,15 +82,60 @@ type Builder struct { tableIndex *pb.TableIndex keyHashes []uint64 // Used for building the bloomfilter. opt *Options + + // Used to concurrently compress the blocks. + inCloser sync.WaitGroup + length uint32 + idx int + inChan chan *bblock + blockList []*bblock } // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { - return &Builder{ - buf: make([]byte, 0, 1<<20), + b := &Builder{ + // Additional 200 bytes to store index (approximate). + buf: make([]byte, 0, opts.TableSize+MB*100), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, + inChan: make(chan *bblock, 1), + } + + count := 1 + b.inCloser.Add(1) + for i := 0; i < count; i++ { + go b.handleBlock(i) + } + return b +} + +func (b *Builder) handleBlock(i int) { + defer b.inCloser.Done() + for item := range b.inChan { + // uid := uuid.New() + // fmt.Println(uid, "-routine", i, "Processing", item.idx, "start", item.start, "with end", item.end) + // Extract the item + blockBuf := item.data[item.start:item.end] + // Compress the block. + if b.opt.Compression != options.None { + var err error + // TODO: Find a way to reuse buffers. Current implementation creates a + // new buffer for each compressData call. + blockBuf, err = b.compressData(blockBuf) + y.Check(err) + } + if b.shouldEncrypt() { + eBlock, err := b.encrypt(blockBuf) + y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) + blockBuf = eBlock + } + + // THIS IS IMPORTAN!!!!! + copy(b.buf[item.start:], blockBuf) + + newend := item.start + uint32(len(blockBuf)) + item.end = newend } } @@ -124,12 +179,15 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { y.AssertTrue(uint32(len(b.buf)) < math.MaxUint32) b.entryOffsets = append(b.entryOffsets, uint32(len(b.buf))-b.baseOffset) + // fmt.Println("cap of b.buf", cap(b.buf[len(b.buf):])) + // Layout: header, diffKey, value. b.buf = append(b.buf, h.Encode()...) b.buf = append(b.buf, diffKey...) bb := &bytes.Buffer{} v.EncodeTo(bb) + b.buf = append(b.buf, bb.Bytes()...) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) @@ -150,31 +208,27 @@ Structure of Block. */ // In case the data is encrypted, the "IV" is added to the end of the block. func (b *Builder) finishBlock() { + //copy(b.buf[len(b.buf):], y.U32SliceToBytes(b.entryOffsets)) + //copy(b.buf[len(b.buf):], y.U32ToBytes(uint32(len(b.entryOffsets)))) b.buf = append(b.buf, y.U32SliceToBytes(b.entryOffsets)...) b.buf = append(b.buf, y.U32ToBytes(uint32(len(b.entryOffsets)))...) - blockBuf := b.buf[b.baseOffset:] // Store checksum for current block. - b.writeChecksum(blockBuf) - - // Compress the block. - if b.opt.Compression != options.None { - var err error - // TODO: Find a way to reuse buffers. Current implementation creates a - // new buffer for each compressData call. - blockBuf, err = b.compressData(b.buf[b.baseOffset:]) - y.Check(err) - } - if b.shouldEncrypt() { - block := b.buf[b.baseOffset:] - eBlock, err := b.encrypt(block) - y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) - blockBuf = eBlock - } - - b.buf = append(b.buf[b.baseOffset:], blockBuf...) + //fmt.Println("cap ", len(b.buf), cap(b.buf)) + b.writeChecksum(b.buf[b.baseOffset:]) - // TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can - // implement padding. This might be useful while using direct I/O. + blockBuf := b.buf[b.baseOffset:] // Store checksum for current block. + //fmt.Println("cap ", len(b.buf), cap(b.buf)) + padding := 200 + // Add 30 bytes of empty space + //copy(b.buf[len(b.buf):], make([]byte, padding)) + //fmt.Println("cap ", len(b.buf), cap(b.buf)) + b.buf = append(b.buf, make([]byte, padding)...) + + // Add 30 bytes of empty space + //====================================================== + block := &bblock{idx: b.idx, start: b.baseOffset, end: uint32(len(b.buf) - padding), data: b.buf} + b.idx++ + b.blockList = append(b.blockList, block) // Add key to the block index bo := &pb.BlockOffset{ @@ -183,6 +237,8 @@ func (b *Builder) finishBlock() { Len: uint32(len(blockBuf)), } b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) + // Push to the block handler. + b.inChan <- block } func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { @@ -264,6 +320,20 @@ func (b *Builder) Finish() []byte { b.finishBlock() // This will never start a new block. + close(b.inChan) + // Wait for handler to finish + b.inCloser.Wait() + + start := uint32(0) + for i, bl := range b.blockList { + b.tableIndex.Offsets[i].Len = bl.end - bl.start + b.tableIndex.Offsets[i].Offset = start + + copy(b.buf[start:], b.buf[bl.start:bl.end]) + start = bl.end + } + b.buf = b.buf[:start] + index, err := proto.Marshal(b.tableIndex) y.Check(err) @@ -273,7 +343,6 @@ func (b *Builder) Finish() []byte { } b.buf = append(b.buf, index...) // Write index the file. - b.buf = append(b.buf, y.U32ToBytes(uint32(len(index)))...) b.writeChecksum(index) diff --git a/table/builder_test.go b/table/builder_test.go index 2c50a45cc..d2c20fc4d 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,30 +32,39 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keyPrefix := "key" - t.Run("single key", func(t *testing.T) { - opts := Options{Compression: options.ZSTD} - f := buildTestTable(t, keyPrefix, 1, opts) - tbl, err := OpenTable(f, opts) - require.NoError(t, err) - require.Len(t, tbl.blockIndex, 1) - }) + keysCount := 1000 + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err) + subTest := []struct { + name string + opts Options + }{ + { + name: "No encyption/compression", + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}, + }, + { + // Encryption mode. + name: "Only encryption", + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, DataKey: &pb.DataKey{Data: key}}, + }, + { + // Compression mode. + name: "Only compression", + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD}, + }, + { + // Compression mode and encryption. + name: "Compression and encryption", + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD, + DataKey: &pb.DataKey{Data: key}}, + }, + } - t.Run("multiple keys", func(t *testing.T) { - opts := []Options{} - // Normal mode. - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}) - // Encryption mode. - key := make([]byte, 32) - _, err := rand.Read(key) - require.NoError(t, err) - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - DataKey: &pb.DataKey{Data: key}}) - // Compression mode. - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - Compression: options.ZSTD}) - keysCount := 2 - for _, opt := range opts { + for _, tt := range subTest { + t.Run(tt.name, func(t *testing.T) { + opt := tt.opts builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) @@ -65,7 +74,7 @@ func TestTableIndex(t *testing.T) { blockCount := 0 for i := 0; i < keysCount; i++ { k := []byte(fmt.Sprintf("%016x", i)) - v := make([]byte, 4<<10) + v := make([]byte, 10) rand.Read(v) //v := fmt.Sprintf("%d", i) vs := y.ValueStruct{Value: []byte(v)} @@ -96,8 +105,8 @@ func TestTableIndex(t *testing.T) { } f.Close() require.NoError(t, os.RemoveAll(filename)) - } - }) + }) + } } func TestInvalidCompression(t *testing.T) { diff --git a/table/iterator.go b/table/iterator.go index a1e760c78..c3fb39963 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -60,13 +60,13 @@ func (itr *blockIterator) setIdx(i int) { } itr.err = nil startOffset := int(itr.entryOffsets[i]) - // Set base key. if len(itr.baseKey) == 0 { var baseHeader header baseHeader.Decode(itr.data) itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff] } + var endOffset int // idx points to the last entry in the block. if itr.idx+1 == len(itr.entryOffsets) { diff --git a/table/table.go b/table/table.go index 7c2d60539..ba863c0fb 100644 --- a/table/table.go +++ b/table/table.go @@ -49,6 +49,9 @@ const intSize = int(unsafe.Sizeof(int(0))) type Options struct { // Options for Opening/Building Table. + // Maximum size of the table + TableSize uint64 + // ChkMode is the checksum verification mode for Table. ChkMode options.ChecksumVerificationMode @@ -427,24 +430,19 @@ func (t *Table) block(idx int) (*block, error) { // Checksum length greater than block size could happen if the table was compressed and // it was opened with an incorrect compression algorithm (or the data was corrupted). if blk.chkLen > len(blk.data) { - return nil, errors.New("invalid checksum length. Either the data is" + + return nil, errors.New("invalid checksum length. Either the data is " + "corrupted or the table options are incorrectly set") } - fmt.Println("readpos", readPos) // Read checksum and store it readPos -= blk.chkLen blk.checksum = blk.data[readPos : readPos+blk.chkLen] // Move back and read numEntries in the block. readPos -= 4 - fmt.Println("readpos", readPos) numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4])) - fmt.Println("num entries", numEntries) entriesIndexStart := readPos - (numEntries * 4) - fmt.Println("readpos", entriesIndexStart) entriesIndexEnd := entriesIndexStart + numEntries*4 - fmt.Println("readpos", entriesIndexEnd, entriesIndexStart) blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd]) blk.entriesIndexStart = entriesIndexStart diff --git a/table/table_test.go b/table/table_test.go index 27a4f1d16..493c04b56 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -356,7 +356,7 @@ func TestIterateBackAndForth(t *testing.T) { it.seekToFirst() k = it.Key() - require.EqualValues(t, key("key", 0), y.ParseKey(k)) + require.EqualValues(t, key("key", 0), string(y.ParseKey(k))) } func TestUniIterator(t *testing.T) { From ebbb23ee11aa50756d56e22a4ca15e585474b827 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 11 Feb 2020 20:15:48 +0530 Subject: [PATCH 03/33] Everything works but super slow --- table/builder.go | 84 ++++++++++++++++++++++++++++++++----------- table/builder_test.go | 2 +- 2 files changed, 65 insertions(+), 21 deletions(-) diff --git a/table/builder.go b/table/builder.go index 2bb078f1d..8d2222c2b 100644 --- a/table/builder.go +++ b/table/builder.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/aes" "math" + "runtime" "sync" "unsafe" @@ -75,6 +76,7 @@ type bblock struct { type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte + sz int baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -95,15 +97,15 @@ type Builder struct { func NewTableBuilder(opts Options) *Builder { b := &Builder{ // Additional 200 bytes to store index (approximate). - buf: make([]byte, 0, opts.TableSize+MB*100), + buf: make([]byte, opts.TableSize+MB*200), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, inChan: make(chan *bblock, 1), } - count := 1 - b.inCloser.Add(1) + count := runtime.NumCPU() + b.inCloser.Add(count) for i := 0; i < count; i++ { go b.handleBlock(i) } @@ -143,7 +145,7 @@ func (b *Builder) handleBlock(i int) { func (b *Builder) Close() {} // Empty returns whether it's empty. -func (b *Builder) Empty() bool { return len(b.buf) == 0 } +func (b *Builder) Empty() bool { return b.sz == 0 } // keyDiff returns a suffix of newKey that is different from b.baseKey. func (b *Builder) keyDiff(newKey []byte) []byte { @@ -176,25 +178,33 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // store current entry's offset - y.AssertTrue(uint32(len(b.buf)) < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, uint32(len(b.buf))-b.baseOffset) + y.AssertTrue(uint32(b.sz) < math.MaxUint32) + b.entryOffsets = append(b.entryOffsets, uint32(b.sz)-b.baseOffset) // fmt.Println("cap of b.buf", cap(b.buf[len(b.buf):])) // Layout: header, diffKey, value. - b.buf = append(b.buf, h.Encode()...) - b.buf = append(b.buf, diffKey...) + b.append(h.Encode()) + //b.buf = append(b.buf, h.Encode()...) + b.append(diffKey) + //b.buf = append(b.buf, diffKey...) bb := &bytes.Buffer{} v.EncodeTo(bb) - b.buf = append(b.buf, bb.Bytes()...) + b.append(bb.Bytes()) + //b.buf = append(b.buf, bb.Bytes()...) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) // Total estimated size = size on SST + size on vlog (length of value pointer). b.tableIndex.EstimatedSize += (sstSz + vpLen) } +func (b *Builder) append(data []byte) { + copy(b.buf[b.sz:], data) + b.sz += len(data) +} + /* Structure of Block. +-------------------+---------------------+--------------------+--------------+------------------+ @@ -210,8 +220,11 @@ Structure of Block. func (b *Builder) finishBlock() { //copy(b.buf[len(b.buf):], y.U32SliceToBytes(b.entryOffsets)) //copy(b.buf[len(b.buf):], y.U32ToBytes(uint32(len(b.entryOffsets)))) - b.buf = append(b.buf, y.U32SliceToBytes(b.entryOffsets)...) - b.buf = append(b.buf, y.U32ToBytes(uint32(len(b.entryOffsets)))...) + //b.buf = append(b.buf, y.U32SliceToBytes(b.entryOffsets)...) + //spew.Dump(b.entryOffsets) + b.append(y.U32SliceToBytes(b.entryOffsets)) + //b.buf = append(b.buf, y.U32ToBytes(uint32(len(b.entryOffsets)))...) + b.append(y.U32ToBytes(uint32(len(b.entryOffsets)))) //fmt.Println("cap ", len(b.buf), cap(b.buf)) b.writeChecksum(b.buf[b.baseOffset:]) @@ -222,11 +235,14 @@ func (b *Builder) finishBlock() { // Add 30 bytes of empty space //copy(b.buf[len(b.buf):], make([]byte, padding)) //fmt.Println("cap ", len(b.buf), cap(b.buf)) - b.buf = append(b.buf, make([]byte, padding)...) + + b.append(make([]byte, padding)) + //b.buf = append(b.buf, make([]byte, padding)...) // Add 30 bytes of empty space //====================================================== - block := &bblock{idx: b.idx, start: b.baseOffset, end: uint32(len(b.buf) - padding), data: b.buf} + //block := &bblock{idx: b.idx, start: b.baseOffset, end: uint32(len(b.buf) - padding), data: b.buf} + block := &bblock{idx: b.idx, start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf} b.idx++ b.blockList = append(b.blockList, block) @@ -254,7 +270,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { 4 + // size of list 8 + // Sum64 in checksum proto 4) // checksum length - estimatedSize := uint32(len(b.buf)) - b.baseOffset + uint32(6 /*header size for entry*/) + + estimatedSize := uint32(b.sz) - b.baseOffset + uint32(6 /*header size for entry*/) + uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize if b.shouldEncrypt() { @@ -271,8 +287,8 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { b.finishBlock() // Start a new block. Initialize the block. b.baseKey = []byte{} - y.AssertTrue(uint32(len(b.buf)) < math.MaxUint32) - b.baseOffset = uint32(len(b.buf)) + y.AssertTrue(uint32(b.sz) < math.MaxUint32) + b.baseOffset = uint32((b.sz)) b.entryOffsets = b.entryOffsets[:0] } b.addHelper(key, value, uint64(valueLen)) @@ -286,7 +302,7 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - blocksSize := len(b.buf) + // length of current buffer + blocksSize := b.sz + // length of current buffer len(b.entryOffsets)*4 + // all entry offsets size 4 + // count of all entry offsets 8 + // checksum bytes @@ -341,11 +357,37 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } + //b.append(index) b.buf = append(b.buf, index...) // Write index the file. b.buf = append(b.buf, y.U32ToBytes(uint32(len(index)))...) + //b.append(y.U32ToBytes(uint32(len(index)))) - b.writeChecksum(index) + //b.append(make([]byte, 10)) + //b.writeChecksum(index) + // Build checksum for the index. + checksum := pb.Checksum{ + // TODO: The checksum type should be configurable from the + // options. + // We chose to use CRC32 as the default option because + // it performed better compared to xxHash64. + // See the BenchmarkChecksum in table_test.go file + // Size => 1024 B 2048 B + // CRC32 => 63.7 ns/op 112 ns/op + // xxHash64 => 87.5 ns/op 158 ns/op + Sum: y.CalculateChecksum(index, pb.Checksum_CRC32C), + Algo: pb.Checksum_CRC32C, + } + + // Write checksum to the file. + chksum, err := proto.Marshal(&checksum) + y.Check(err) + b.buf = append(b.buf, chksum...) + //b.buf = append(b.buf, chksum...) + + // Write checksum size. + //b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) + b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) return b.buf } @@ -367,10 +409,12 @@ func (b *Builder) writeChecksum(data []byte) { // Write checksum to the file. chksum, err := proto.Marshal(&checksum) y.Check(err) - b.buf = append(b.buf, chksum...) + b.append(chksum) + //b.buf = append(b.buf, chksum...) // Write checksum size. - b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) + //b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) + b.append(y.U32ToBytes(uint32(len(chksum)))) } // DataKey returns datakey of the builder. diff --git a/table/builder_test.go b/table/builder_test.go index d2c20fc4d..c8abc33be 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,7 +32,7 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keysCount := 1000 + keysCount := 100000 key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) From 62cda51e66b4ef040ff4caa387ff600be82764aa Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 12 Feb 2020 13:55:18 +0530 Subject: [PATCH 04/33] Everything works but slower than master --- table/builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index 8d2222c2b..79624aa47 100644 --- a/table/builder.go +++ b/table/builder.go @@ -227,9 +227,9 @@ func (b *Builder) finishBlock() { b.append(y.U32ToBytes(uint32(len(b.entryOffsets)))) //fmt.Println("cap ", len(b.buf), cap(b.buf)) - b.writeChecksum(b.buf[b.baseOffset:]) + b.writeChecksum(b.buf[b.baseOffset:b.sz]) - blockBuf := b.buf[b.baseOffset:] // Store checksum for current block. + blockBuf := b.buf[b.baseOffset:b.sz] // Store checksum for current block. //fmt.Println("cap ", len(b.buf), cap(b.buf)) padding := 200 // Add 30 bytes of empty space From 7230da4ae038ff8942b5d66cab54b33952190f26 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 12 Feb 2020 15:12:19 +0530 Subject: [PATCH 05/33] Reuse encode buffer --- table/builder.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/table/builder.go b/table/builder.go index 79624aa47..6fef318cb 100644 --- a/table/builder.go +++ b/table/builder.go @@ -20,7 +20,6 @@ import ( "bytes" "crypto/aes" "math" - "runtime" "sync" "unsafe" @@ -77,6 +76,7 @@ type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte sz int + bb bytes.Buffer baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -101,10 +101,10 @@ func NewTableBuilder(opts Options) *Builder { tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, - inChan: make(chan *bblock, 1), + inChan: make(chan *bblock, 1000), } - count := runtime.NumCPU() + count := 4 //runtime.NumCPU() b.inCloser.Add(count) for i := 0; i < count; i++ { go b.handleBlock(i) @@ -189,8 +189,10 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(diffKey) //b.buf = append(b.buf, diffKey...) - bb := &bytes.Buffer{} + bb := &b.bb + bb.Reset() v.EncodeTo(bb) + //b.sz += v.EncodeTo1(b.buf, b.sz) b.append(bb.Bytes()) //b.buf = append(b.buf, bb.Bytes()...) From 277b0af55d9dd26d7632249532eab640102284ba Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 12 Feb 2020 16:10:12 +0530 Subject: [PATCH 06/33] Reuse pool for decompression and []byte in encode --- table/builder.go | 26 ++++++++++++++------------ table/builder_test.go | 10 ++++------ y/iterator.go | 8 ++++++-- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/table/builder.go b/table/builder.go index 6fef318cb..379606125 100644 --- a/table/builder.go +++ b/table/builder.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/aes" "math" + "runtime" "sync" "unsafe" @@ -76,7 +77,6 @@ type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte sz int - bb bytes.Buffer baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -104,7 +104,7 @@ func NewTableBuilder(opts Options) *Builder { inChan: make(chan *bblock, 1000), } - count := 4 //runtime.NumCPU() + count := runtime.NumCPU() b.inCloser.Add(count) for i := 0; i < count; i++ { go b.handleBlock(i) @@ -112,6 +112,8 @@ func NewTableBuilder(opts Options) *Builder { return b } +var slicePool = sync.Pool{New: func() interface{} { return make([]byte, 0, 100) }} + func (b *Builder) handleBlock(i int) { defer b.inCloser.Done() for item := range b.inChan { @@ -119,12 +121,15 @@ func (b *Builder) handleBlock(i int) { // fmt.Println(uid, "-routine", i, "Processing", item.idx, "start", item.start, "with end", item.end) // Extract the item blockBuf := item.data[item.start:item.end] + var dst []byte // Compress the block. if b.opt.Compression != options.None { var err error // TODO: Find a way to reuse buffers. Current implementation creates a // new buffer for each compressData call. - blockBuf, err = b.compressData(blockBuf) + dst = slicePool.Get().([]byte) + dst = dst[:0] + blockBuf, err = b.compressData(dst, blockBuf) y.Check(err) } if b.shouldEncrypt() { @@ -136,6 +141,8 @@ func (b *Builder) handleBlock(i int) { // THIS IS IMPORTAN!!!!! copy(b.buf[item.start:], blockBuf) + slicePool.Put(dst) + newend := item.start + uint32(len(blockBuf)) item.end = newend } @@ -189,13 +196,8 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(diffKey) //b.buf = append(b.buf, diffKey...) - bb := &b.bb - bb.Reset() - v.EncodeTo(bb) - //b.sz += v.EncodeTo1(b.buf, b.sz) + b.sz += v.Encode(b.buf[b.sz:]) - b.append(bb.Bytes()) - //b.buf = append(b.buf, bb.Bytes()...) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) // Total estimated size = size on SST + size on vlog (length of value pointer). @@ -446,14 +448,14 @@ func (b *Builder) shouldEncrypt() bool { } // compressData compresses the given data. -func (b *Builder) compressData(data []byte) ([]byte, error) { +func (b *Builder) compressData(dst, data []byte) ([]byte, error) { switch b.opt.Compression { case options.None: return data, nil case options.Snappy: - return snappy.Encode(nil, data), nil + return snappy.Encode(dst, data), nil case options.ZSTD: - return y.ZSTDCompress(nil, data, b.opt.ZSTDCompressionLevel) + return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel) } return nil, errors.New("Unsupported compression type") } diff --git a/table/builder_test.go b/table/builder_test.go index c8abc33be..6236373b7 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,7 +32,7 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keysCount := 100000 + keysCount := 1000000 key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) @@ -52,13 +52,13 @@ func TestTableIndex(t *testing.T) { { // Compression mode. name: "Only compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD, ZSTDCompressionLevel: 3}, }, { // Compression mode and encryption. name: "Compression and encryption", opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD, - DataKey: &pb.DataKey{Data: key}}, + ZSTDCompressionLevel: 3, DataKey: &pb.DataKey{Data: key}}, }, } @@ -74,9 +74,7 @@ func TestTableIndex(t *testing.T) { blockCount := 0 for i := 0; i < keysCount; i++ { k := []byte(fmt.Sprintf("%016x", i)) - v := make([]byte, 10) - rand.Read(v) - //v := fmt.Sprintf("%d", i) + v := fmt.Sprintf("%d", i) vs := y.ValueStruct{Value: []byte(v)} if i == 0 { // This is first key for first block. blockFirstKeys = append(blockFirstKeys, k) diff --git a/y/iterator.go b/y/iterator.go index 6d0f677c0..034e7f98d 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -19,6 +19,7 @@ package y import ( "bytes" "encoding/binary" + "fmt" ) // ValueStruct represents the value info that can be associated with a key, but also the internal @@ -64,11 +65,13 @@ func (v *ValueStruct) Decode(b []byte) { } // Encode expects a slice of length at least v.EncodedSize(). -func (v *ValueStruct) Encode(b []byte) { +func (v *ValueStruct) Encode(b []byte) int { b[0] = v.Meta b[1] = v.UserMeta sz := binary.PutUvarint(b[2:], v.ExpiresAt) - copy(b[2+sz:], v.Value) + n := copy(b[2+sz:], v.Value) + // fmt.Println("sz", 2+sz+n) + return 2 + sz + n } // EncodeTo should be kept in sync with the Encode function above. The reason @@ -79,6 +82,7 @@ func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { buf.WriteByte(v.UserMeta) var enc [binary.MaxVarintLen64]byte sz := binary.PutUvarint(enc[:], v.ExpiresAt) + fmt.Println("sz", sz) buf.Write(enc[:sz]) buf.Write(v.Value) } From 15ae6905ee3b79c23cf20ea9b79553d8dbf55375 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 12 Feb 2020 19:07:02 +0530 Subject: [PATCH 07/33] fix benchmark --- table/builder_test.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/table/builder_test.go b/table/builder_test.go index 6236373b7..751962c6f 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,7 +32,7 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keysCount := 1000000 + keysCount := 10000000 key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) @@ -42,28 +42,29 @@ func TestTableIndex(t *testing.T) { }{ { name: "No encyption/compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20}, }, { // Encryption mode. name: "Only encryption", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, DataKey: &pb.DataKey{Data: key}}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20, DataKey: &pb.DataKey{Data: key}}, }, { // Compression mode. name: "Only compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD, ZSTDCompressionLevel: 3}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20, Compression: options.ZSTD, ZSTDCompressionLevel: 3}, }, { // Compression mode and encryption. name: "Compression and encryption", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, Compression: options.ZSTD, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20, Compression: options.ZSTD, ZSTDCompressionLevel: 3, DataKey: &pb.DataKey{Data: key}}, }, } for _, tt := range subTest { t.Run(tt.name, func(t *testing.T) { + start := time.Now() opt := tt.opts builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) @@ -103,6 +104,7 @@ func TestTableIndex(t *testing.T) { } f.Close() require.NoError(t, os.RemoveAll(filename)) + fmt.Println("time taken", time.Since(start)) }) } } @@ -135,8 +137,12 @@ func BenchmarkBuilder(b *testing.B) { keysCount := 1300000 // This number of entries consumes ~64MB of memory. + var keyList [][]byte + for i := 0; i < keysCount; i++ { + keyList = append(keyList, key(i)) + } bench := func(b *testing.B, opt *Options) { - // KeyCount * (keySize + ValSize) + b.ResetTimer() b.SetBytes(int64(keysCount) * (32 + 32)) for i := 0; i < b.N; i++ { opt.BlockSize = 4 * 1024 @@ -144,7 +150,7 @@ func BenchmarkBuilder(b *testing.B) { builder := NewTableBuilder(*opt) for i := 0; i < keysCount; i++ { - builder.Add(key(i), vs, 0) + builder.Add(keyList[i], vs, 0) } _ = builder.Finish() From 93963ad8cf49b06c26ea46649ee880b2b375d2c9 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 12 Feb 2020 19:14:04 +0530 Subject: [PATCH 08/33] performance issues. One go routine faster than multiple go routines --- table/builder.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index 379606125..09abc696c 100644 --- a/table/builder.go +++ b/table/builder.go @@ -19,8 +19,8 @@ package table import ( "bytes" "crypto/aes" + "fmt" "math" - "runtime" "sync" "unsafe" @@ -104,7 +104,8 @@ func NewTableBuilder(opts Options) *Builder { inChan: make(chan *bblock, 1000), } - count := runtime.NumCPU() + count := 1 //runtime.NumCPU() + fmt.Println("with count", count, "chanlen", cap(b.inChan)) b.inCloser.Add(count) for i := 0; i < count; i++ { go b.handleBlock(i) @@ -344,6 +345,7 @@ func (b *Builder) Finish() []byte { // Wait for handler to finish b.inCloser.Wait() + //fmt.Println("num of blocks", len(b.tableIndex.Offsets)) start := uint32(0) for i, bl := range b.blockList { b.tableIndex.Offsets[i].Len = bl.end - bl.start From 37d0c0aea5b3a156bc0bf24cabfeb7109f10f058 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 21 Feb 2020 17:47:48 +0530 Subject: [PATCH 09/33] Dont use go routines for no compression/encryption --- table/builder.go | 47 ++++++++++++++++++++++++++++++------------- table/builder_test.go | 4 ++-- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/table/builder.go b/table/builder.go index 09abc696c..123abef90 100644 --- a/table/builder.go +++ b/table/builder.go @@ -19,8 +19,8 @@ package table import ( "bytes" "crypto/aes" - "fmt" "math" + "runtime" "sync" "unsafe" @@ -101,11 +101,16 @@ func NewTableBuilder(opts Options) *Builder { tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, - inChan: make(chan *bblock, 1000), } - count := 1 //runtime.NumCPU() - fmt.Println("with count", count, "chanlen", cap(b.inChan)) + // If encryption or compression is not enabled, write directly to the buffer. + if b.opt.Compression == options.None && b.opt.DataKey == nil { + return b + } + + b.inChan = make(chan *bblock, 1000) + count := runtime.NumCPU() + //fmt.Println("with count", count, "chanlen", cap(b.inChan)) b.inCloser.Add(count) for i := 0; i < count; i++ { go b.handleBlock(i) @@ -235,8 +240,18 @@ func (b *Builder) finishBlock() { b.writeChecksum(b.buf[b.baseOffset:b.sz]) blockBuf := b.buf[b.baseOffset:b.sz] // Store checksum for current block. - //fmt.Println("cap ", len(b.buf), cap(b.buf)) + if b.opt.Compression == options.None && b.opt.DataKey == nil { + // Add key to the block index + bo := &pb.BlockOffset{ + Key: y.Copy(b.baseKey), + Offset: b.baseOffset, + Len: uint32(len(blockBuf)), + } + b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) + return + } padding := 200 + //fmt.Println("cap ", len(b.buf), cap(b.buf)) // Add 30 bytes of empty space //copy(b.buf[len(b.buf):], make([]byte, padding)) //fmt.Println("cap ", len(b.buf), cap(b.buf)) @@ -341,20 +356,24 @@ func (b *Builder) Finish() []byte { b.finishBlock() // This will never start a new block. - close(b.inChan) + if b.inChan != nil { + close(b.inChan) + } // Wait for handler to finish b.inCloser.Wait() //fmt.Println("num of blocks", len(b.tableIndex.Offsets)) - start := uint32(0) - for i, bl := range b.blockList { - b.tableIndex.Offsets[i].Len = bl.end - bl.start - b.tableIndex.Offsets[i].Offset = start - - copy(b.buf[start:], b.buf[bl.start:bl.end]) - start = bl.end + if len(b.blockList) > 0 { + start := uint32(0) + for i, bl := range b.blockList { + b.tableIndex.Offsets[i].Len = bl.end - bl.start + b.tableIndex.Offsets[i].Offset = start + + copy(b.buf[start:], b.buf[bl.start:bl.end]) + start = bl.end + } + b.buf = b.buf[:start] } - b.buf = b.buf[:start] index, err := proto.Marshal(b.tableIndex) y.Check(err) diff --git a/table/builder_test.go b/table/builder_test.go index 751962c6f..ca0e4f63f 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -149,8 +149,8 @@ func BenchmarkBuilder(b *testing.B) { opt.BloomFalsePositive = 0.01 builder := NewTableBuilder(*opt) - for i := 0; i < keysCount; i++ { - builder.Add(keyList[i], vs, 0) + for j := 0; j < keysCount; j++ { + builder.Add(keyList[j], vs, 0) } _ = builder.Finish() From 5795be5a1bf285858b4c35d8e416cc81b6cd813f Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 21 Feb 2020 20:08:15 +0530 Subject: [PATCH 10/33] All table tests work --- table/builder.go | 130 ++++++++++++++++++-------------------------- table/table_test.go | 5 -- 2 files changed, 54 insertions(+), 81 deletions(-) diff --git a/table/builder.go b/table/builder.go index 123abef90..88fb115b2 100644 --- a/table/builder.go +++ b/table/builder.go @@ -35,6 +35,11 @@ import ( "github.com/dgraph-io/ristretto/z" ) +const ( + KB = 1024 + MB = KB * 1024 +) + func newBuffer(sz int) *bytes.Buffer { b := new(bytes.Buffer) b.Grow(sz) @@ -66,10 +71,8 @@ func (h *header) Decode(buf []byte) { type bblock struct { data []byte key []byte - idx int - start uint32 - end uint32 - done bool + start uint32 // Points to the starting offset of the block. + end uint32 // Points to the end offset of the block. } // Builder is used in building a table. @@ -85,11 +88,9 @@ type Builder struct { keyHashes []uint64 // Used for building the bloomfilter. opt *Options - // Used to concurrently compress the blocks. - inCloser sync.WaitGroup - length uint32 - idx int - inChan chan *bblock + // Used to concurrently compress/encrypt blocks. + wg sync.WaitGroup + blockChan chan *bblock blockList []*bblock } @@ -97,21 +98,23 @@ type Builder struct { func NewTableBuilder(opts Options) *Builder { b := &Builder{ // Additional 200 bytes to store index (approximate). + // We trim the additional space in table.Finish(). buf: make([]byte, opts.TableSize+MB*200), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, } - // If encryption or compression is not enabled, write directly to the buffer. + // If encryption or compression is not enabled, do not start compression/encryption goroutines + // and write directly to the buffer. if b.opt.Compression == options.None && b.opt.DataKey == nil { return b } - b.inChan = make(chan *bblock, 1000) + b.blockChan = make(chan *bblock, 1000) + count := runtime.NumCPU() - //fmt.Println("with count", count, "chanlen", cap(b.inChan)) - b.inCloser.Add(count) + b.wg.Add(count) for i := 0; i < count; i++ { go b.handleBlock(i) } @@ -121,20 +124,18 @@ func NewTableBuilder(opts Options) *Builder { var slicePool = sync.Pool{New: func() interface{} { return make([]byte, 0, 100) }} func (b *Builder) handleBlock(i int) { - defer b.inCloser.Done() - for item := range b.inChan { - // uid := uuid.New() - // fmt.Println(uid, "-routine", i, "Processing", item.idx, "start", item.start, "with end", item.end) + defer b.wg.Done() + for item := range b.blockChan { // Extract the item blockBuf := item.data[item.start:item.end] var dst []byte // Compress the block. if b.opt.Compression != options.None { var err error - // TODO: Find a way to reuse buffers. Current implementation creates a - // new buffer for each compressData call. + dst = slicePool.Get().([]byte) dst = dst[:0] + blockBuf, err = b.compressData(dst, blockBuf) y.Check(err) } @@ -144,7 +145,7 @@ func (b *Builder) handleBlock(i int) { blockBuf = eBlock } - // THIS IS IMPORTAN!!!!! + // Copy over compressed/encrypted data back to the main buffer. copy(b.buf[item.start:], blockBuf) slicePool.Put(dst) @@ -194,13 +195,9 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { y.AssertTrue(uint32(b.sz) < math.MaxUint32) b.entryOffsets = append(b.entryOffsets, uint32(b.sz)-b.baseOffset) - // fmt.Println("cap of b.buf", cap(b.buf[len(b.buf):])) - // Layout: header, diffKey, value. b.append(h.Encode()) - //b.buf = append(b.buf, h.Encode()...) b.append(diffKey) - //b.buf = append(b.buf, diffKey...) b.sz += v.Encode(b.buf[b.sz:]) @@ -215,6 +212,10 @@ func (b *Builder) append(data []byte) { b.sz += len(data) } +func (b *Builder) addPadding(sz int) { + b.sz += sz +} + /* Structure of Block. +-------------------+---------------------+--------------------+--------------+------------------+ @@ -228,53 +229,41 @@ Structure of Block. */ // In case the data is encrypted, the "IV" is added to the end of the block. func (b *Builder) finishBlock() { - //copy(b.buf[len(b.buf):], y.U32SliceToBytes(b.entryOffsets)) - //copy(b.buf[len(b.buf):], y.U32ToBytes(uint32(len(b.entryOffsets)))) - //b.buf = append(b.buf, y.U32SliceToBytes(b.entryOffsets)...) - //spew.Dump(b.entryOffsets) b.append(y.U32SliceToBytes(b.entryOffsets)) - //b.buf = append(b.buf, y.U32ToBytes(uint32(len(b.entryOffsets)))...) b.append(y.U32ToBytes(uint32(len(b.entryOffsets)))) - //fmt.Println("cap ", len(b.buf), cap(b.buf)) b.writeChecksum(b.buf[b.baseOffset:b.sz]) - blockBuf := b.buf[b.baseOffset:b.sz] // Store checksum for current block. - if b.opt.Compression == options.None && b.opt.DataKey == nil { - // Add key to the block index - bo := &pb.BlockOffset{ - Key: y.Copy(b.baseKey), - Offset: b.baseOffset, - Len: uint32(len(blockBuf)), - } - b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) + // If compression/encryption is disabled, no need to send the block to the blockChan. + // There's nothing to be done. + if b.blockChan == nil { + b.addBlockToIndex() return } + + // When a block is encrypted, it's length increases. We add 200 bytes to padding to + // handle cases when block size increases. padding := 200 - //fmt.Println("cap ", len(b.buf), cap(b.buf)) - // Add 30 bytes of empty space - //copy(b.buf[len(b.buf):], make([]byte, padding)) - //fmt.Println("cap ", len(b.buf), cap(b.buf)) - - b.append(make([]byte, padding)) - //b.buf = append(b.buf, make([]byte, padding)...) - - // Add 30 bytes of empty space - //====================================================== - //block := &bblock{idx: b.idx, start: b.baseOffset, end: uint32(len(b.buf) - padding), data: b.buf} - block := &bblock{idx: b.idx, start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf} - b.idx++ + b.addPadding(padding) + + // Block end is the actual end of the block ignoring the padding. + block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf} b.blockList = append(b.blockList, block) - // Add key to the block index + b.addBlockToIndex() + // Push to the block handler. + b.blockChan <- block +} + +func (b *Builder) addBlockToIndex() { + blockBuf := b.buf[b.baseOffset:b.sz] + // Add key to the block index. bo := &pb.BlockOffset{ Key: y.Copy(b.baseKey), Offset: b.baseOffset, Len: uint32(len(blockBuf)), } b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) - // Push to the block handler. - b.inChan <- block } func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { @@ -356,21 +345,24 @@ func (b *Builder) Finish() []byte { b.finishBlock() // This will never start a new block. - if b.inChan != nil { - close(b.inChan) + if b.blockChan != nil { + close(b.blockChan) } - // Wait for handler to finish - b.inCloser.Wait() + // Wait for block handler to finish. + b.wg.Wait() - //fmt.Println("num of blocks", len(b.tableIndex.Offsets)) + // Fix block boundaries. This includes moving the blocks so that we + // don't have any interleaving space between them. if len(b.blockList) > 0 { start := uint32(0) for i, bl := range b.blockList { + // Length of the block is start minues the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start b.tableIndex.Offsets[i].Offset = start + // Copy over the block to the corrent position in the main buffer. copy(b.buf[start:], b.buf[bl.start:bl.end]) - start = bl.end + start = b.tableIndex.Offsets[i].Offset + b.tableIndex.Offsets[i].Len } b.buf = b.buf[:start] } @@ -382,24 +374,12 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } - //b.append(index) b.buf = append(b.buf, index...) - // Write index the file. + // Write index the buffer. b.buf = append(b.buf, y.U32ToBytes(uint32(len(index)))...) - //b.append(y.U32ToBytes(uint32(len(index)))) - //b.append(make([]byte, 10)) - //b.writeChecksum(index) // Build checksum for the index. checksum := pb.Checksum{ - // TODO: The checksum type should be configurable from the - // options. - // We chose to use CRC32 as the default option because - // it performed better compared to xxHash64. - // See the BenchmarkChecksum in table_test.go file - // Size => 1024 B 2048 B - // CRC32 => 63.7 ns/op 112 ns/op - // xxHash64 => 87.5 ns/op 158 ns/op Sum: y.CalculateChecksum(index, pb.Checksum_CRC32C), Algo: pb.Checksum_CRC32C, } @@ -408,10 +388,8 @@ func (b *Builder) Finish() []byte { chksum, err := proto.Marshal(&checksum) y.Check(err) b.buf = append(b.buf, chksum...) - //b.buf = append(b.buf, chksum...) // Write checksum size. - //b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) return b.buf } diff --git a/table/table_test.go b/table/table_test.go index 493c04b56..c00507034 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -35,11 +35,6 @@ import ( "github.com/stretchr/testify/require" ) -const ( - KB = 1024 - MB = KB * 1024 -) - func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } From b86d1b1fc506ed2be0ec15a4bed425a6f04a2ab0 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 21 Feb 2020 20:30:31 +0530 Subject: [PATCH 11/33] Reduce memory consumption --- table/builder.go | 4 ++-- table/builder_test.go | 2 +- table/table_test.go | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/table/builder.go b/table/builder.go index 88fb115b2..a22be05b4 100644 --- a/table/builder.go +++ b/table/builder.go @@ -97,9 +97,9 @@ type Builder struct { // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { b := &Builder{ - // Additional 200 bytes to store index (approximate). + // Additional 2 MB to store index (approximate). // We trim the additional space in table.Finish(). - buf: make([]byte, opts.TableSize+MB*200), + buf: make([]byte, opts.TableSize+MB*2), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, diff --git a/table/builder_test.go b/table/builder_test.go index ca0e4f63f..edcfebecb 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,7 +32,7 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keysCount := 10000000 + keysCount := 100000 key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) diff --git a/table/table_test.go b/table/table_test.go index c00507034..46bd94d50 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -696,7 +696,8 @@ func TestTableBigValues(t *testing.T) { require.NoError(t, err, "unable to create file") n := 100 // Insert 100 keys. - opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + TableSize: uint64(n) * 1 << 20} builder := NewTableBuilder(opts) for i := 0; i < n; i++ { key := y.KeyWithTs([]byte(key("", i)), 0) From 01f304ef5d5900e0b74931e7d496f30f53b9100f Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 21 Feb 2020 21:29:09 +0530 Subject: [PATCH 12/33] fix test --- db2_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/db2_test.go b/db2_test.go index d9952af45..362f6c3f7 100644 --- a/db2_test.go +++ b/db2_test.go @@ -668,16 +668,13 @@ func TestL0GCBug(t *testing.T) { // Simulate a crash by not closing db1 but releasing the locks. if db1.dirLockGuard != nil { require.NoError(t, db1.dirLockGuard.release()) + db1.dirLockGuard = nil } if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) + db1.valueDirGuard = nil } - for _, f := range db1.vlog.filesMap { - require.NoError(t, f.fd.Close()) - } - require.NoError(t, db1.registry.Close()) - require.NoError(t, db1.lc.close()) - require.NoError(t, db1.manifest.close()) + require.NoError(t, db1.Close()) db2, err := Open(opts) require.NoError(t, err) From da3c4c48182afdd30adddb460d0f6b17658ecabb Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 24 Feb 2020 18:12:06 +0530 Subject: [PATCH 13/33] Cleanup --- table/builder.go | 31 +++++++++---------------------- table/builder_test.go | 22 +++++++++++----------- 2 files changed, 20 insertions(+), 33 deletions(-) diff --git a/table/builder.go b/table/builder.go index a22be05b4..e9580f57c 100644 --- a/table/builder.go +++ b/table/builder.go @@ -116,14 +116,14 @@ func NewTableBuilder(opts Options) *Builder { count := runtime.NumCPU() b.wg.Add(count) for i := 0; i < count; i++ { - go b.handleBlock(i) + go b.handleBlock() } return b } var slicePool = sync.Pool{New: func() interface{} { return make([]byte, 0, 100) }} -func (b *Builder) handleBlock(i int) { +func (b *Builder) handleBlock() { defer b.wg.Done() for item := range b.blockChan { // Extract the item @@ -360,11 +360,12 @@ func (b *Builder) Finish() []byte { b.tableIndex.Offsets[i].Len = bl.end - bl.start b.tableIndex.Offsets[i].Offset = start - // Copy over the block to the corrent position in the main buffer. + // Copy over the block to the current position in the main buffer. copy(b.buf[start:], b.buf[bl.start:bl.end]) start = b.tableIndex.Offsets[i].Offset + b.tableIndex.Offsets[i].Len } - b.buf = b.buf[:start] + // Start writing to the buffer from the point until which we have valid data + b.sz = int(start) } index, err := proto.Marshal(b.tableIndex) @@ -374,24 +375,12 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } - b.buf = append(b.buf, index...) // Write index the buffer. - b.buf = append(b.buf, y.U32ToBytes(uint32(len(index)))...) + b.append(index) + b.append(y.U32ToBytes(uint32(len(index)))) - // Build checksum for the index. - checksum := pb.Checksum{ - Sum: y.CalculateChecksum(index, pb.Checksum_CRC32C), - Algo: pb.Checksum_CRC32C, - } - - // Write checksum to the file. - chksum, err := proto.Marshal(&checksum) - y.Check(err) - b.buf = append(b.buf, chksum...) - - // Write checksum size. - b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) - return b.buf + b.writeChecksum(index) + return b.buf[:b.sz] } func (b *Builder) writeChecksum(data []byte) { @@ -413,10 +402,8 @@ func (b *Builder) writeChecksum(data []byte) { chksum, err := proto.Marshal(&checksum) y.Check(err) b.append(chksum) - //b.buf = append(b.buf, chksum...) // Write checksum size. - //b.buf = append(b.buf, y.U32ToBytes(uint32(len(chksum)))...) b.append(y.U32ToBytes(uint32(len(chksum)))) } diff --git a/table/builder_test.go b/table/builder_test.go index edcfebecb..9f09e3a26 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -42,22 +42,22 @@ func TestTableIndex(t *testing.T) { }{ { name: "No encyption/compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20}, }, { // Encryption mode. name: "Only encryption", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20, DataKey: &pb.DataKey{Data: key}}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20, DataKey: &pb.DataKey{Data: key}}, }, { // Compression mode. name: "Only compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20, Compression: options.ZSTD, ZSTDCompressionLevel: 3}, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20, Compression: options.ZSTD, ZSTDCompressionLevel: 3}, }, { // Compression mode and encryption. name: "Compression and encryption", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 200 << 20, Compression: options.ZSTD, + opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20, Compression: options.ZSTD, ZSTDCompressionLevel: 3, DataKey: &pb.DataKey{Data: key}}, }, } @@ -142,19 +142,19 @@ func BenchmarkBuilder(b *testing.B) { keyList = append(keyList, key(i)) } bench := func(b *testing.B, opt *Options) { - b.ResetTimer() b.SetBytes(int64(keysCount) * (32 + 32)) + opt.BlockSize = 4 * 1024 + opt.BloomFalsePositive = 0.01 + // TODO: Fix this. The size is dependant on benchtime and keycounts. + opt.TableSize = 5 << 30 + builder := NewTableBuilder(*opt) + b.ResetTimer() for i := 0; i < b.N; i++ { - opt.BlockSize = 4 * 1024 - opt.BloomFalsePositive = 0.01 - builder := NewTableBuilder(*opt) - for j := 0; j < keysCount; j++ { builder.Add(keyList[j], vs, 0) } - - _ = builder.Finish() } + _ = builder.Finish() } b.Run("no compression", func(b *testing.B) { From 269bdbd144df04d463f558e61f60f886451bc939 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 24 Feb 2020 18:21:53 +0530 Subject: [PATCH 14/33] cleanup go.mod/sum --- go.mod | 2 -- go.sum | 2 -- 2 files changed, 4 deletions(-) diff --git a/go.mod b/go.mod index 8f0bed624..eae04e485 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,11 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/davecgh/go-spew v1.1.1 github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 - github.com/google/uuid v1.1.1 github.com/kr/pretty v0.1.0 // indirect github.com/pkg/errors v0.8.1 github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index d189d4baa..4c71dbdf4 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= From 1f99b1328aacfc0d8ed68e2bc38ebb96f0ed5221 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 24 Feb 2020 18:22:13 +0530 Subject: [PATCH 15/33] fixup --- table/builder_test.go | 35 +++++++++++++++++++++++++++-------- table/iterator.go | 1 + table/table.go | 2 +- y/iterator.go | 4 +--- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/table/builder_test.go b/table/builder_test.go index 9f09e3a26..86f9b1fdf 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -42,29 +42,49 @@ func TestTableIndex(t *testing.T) { }{ { name: "No encyption/compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20}, + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + }, }, { // Encryption mode. name: "Only encryption", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20, DataKey: &pb.DataKey{Data: key}}, + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + DataKey: &pb.DataKey{Data: key}, + }, }, { // Compression mode. name: "Only compression", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20, Compression: options.ZSTD, ZSTDCompressionLevel: 3}, + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + Compression: options.ZSTD, + ZSTDCompressionLevel: 3, + }, }, { // Compression mode and encryption. name: "Compression and encryption", - opts: Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, TableSize: 30 << 20, Compression: options.ZSTD, - ZSTDCompressionLevel: 3, DataKey: &pb.DataKey{Data: key}}, + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + Compression: options.ZSTD, + ZSTDCompressionLevel: 3, + DataKey: &pb.DataKey{Data: key}, + }, }, } for _, tt := range subTest { t.Run(tt.name, func(t *testing.T) { - start := time.Now() opt := tt.opts builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) @@ -92,7 +112,7 @@ func TestTableIndex(t *testing.T) { tbl, err := OpenTable(f, opt) require.NoError(t, err) if opt.DataKey == nil { - // key id is zero if thre is no datakey. + // key id is zero if there is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } require.NoError(t, err, "unable to open table") @@ -104,7 +124,6 @@ func TestTableIndex(t *testing.T) { } f.Close() require.NoError(t, os.RemoveAll(filename)) - fmt.Println("time taken", time.Since(start)) }) } } diff --git a/table/iterator.go b/table/iterator.go index c3fb39963..33a99a8f9 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -60,6 +60,7 @@ func (itr *blockIterator) setIdx(i int) { } itr.err = nil startOffset := int(itr.entryOffsets[i]) + // Set base key. if len(itr.baseKey) == 0 { var baseHeader header diff --git a/table/table.go b/table/table.go index ba863c0fb..f96ae4359 100644 --- a/table/table.go +++ b/table/table.go @@ -49,7 +49,7 @@ const intSize = int(unsafe.Sizeof(int(0))) type Options struct { // Options for Opening/Building Table. - // Maximum size of the table + // Maximum size of the table. TableSize uint64 // ChkMode is the checksum verification mode for Table. diff --git a/y/iterator.go b/y/iterator.go index 034e7f98d..31e6021e7 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -19,7 +19,6 @@ package y import ( "bytes" "encoding/binary" - "fmt" ) // ValueStruct represents the value info that can be associated with a key, but also the internal @@ -70,7 +69,6 @@ func (v *ValueStruct) Encode(b []byte) int { b[1] = v.UserMeta sz := binary.PutUvarint(b[2:], v.ExpiresAt) n := copy(b[2+sz:], v.Value) - // fmt.Println("sz", 2+sz+n) return 2 + sz + n } @@ -82,7 +80,7 @@ func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { buf.WriteByte(v.UserMeta) var enc [binary.MaxVarintLen64]byte sz := binary.PutUvarint(enc[:], v.ExpiresAt) - fmt.Println("sz", sz) + buf.Write(enc[:sz]) buf.Write(v.Value) } From bd0ae87dc6e246700187c01ced1704a49e1769ae Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 2 Mar 2020 14:53:23 +0530 Subject: [PATCH 16/33] Use locks to protect buf copy --- table/builder.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index e9580f57c..865acd9d1 100644 --- a/table/builder.go +++ b/table/builder.go @@ -78,8 +78,9 @@ type bblock struct { // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. - buf []byte - sz int + buf []byte + sz int + bufLock sync.Mutex baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -145,8 +146,10 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } + b.bufLock.Lock() // Copy over compressed/encrypted data back to the main buffer. copy(b.buf[item.start:], blockBuf) + b.bufLock.Unlock() slicePool.Put(dst) @@ -199,6 +202,10 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(h.Encode()) b.append(diffKey) + // Continue growing until we have enough space. + for uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { + b.grow() + } b.sz += v.Encode(b.buf[b.sz:]) // Size of KV on SST. @@ -207,13 +214,29 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.tableIndex.EstimatedSize += (sstSz + vpLen) } +// grow increases the size of b.buf by 10%. +func (b *Builder) grow() { + b.bufLock.Lock() + newBuf := make([]byte, len(b.buf)+len(b.buf)/10) + copy(newBuf, b.buf) + b.buf = newBuf + b.bufLock.Unlock() +} func (b *Builder) append(data []byte) { + // Continue growing until we have enough space. + for len(b.buf) < b.sz+len(data) { + b.grow() + } copy(b.buf[b.sz:], data) b.sz += len(data) } func (b *Builder) addPadding(sz int) { b.sz += sz + // Continue growing until we have enough space. + for len(b.buf) < b.sz { + b.grow() + } } /* From d807f051f662c1453183c804315c999fa690f58c Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 2 Mar 2020 14:53:23 +0530 Subject: [PATCH 17/33] Use locks to protect buf copy --- table/builder.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index e9580f57c..0e455d29b 100644 --- a/table/builder.go +++ b/table/builder.go @@ -78,8 +78,9 @@ type bblock struct { // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. - buf []byte - sz int + buf []byte + sz int + bufLock sync.Mutex baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -145,8 +146,10 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } + b.bufLock.Lock() // Copy over compressed/encrypted data back to the main buffer. copy(b.buf[item.start:], blockBuf) + b.bufLock.Unlock() slicePool.Put(dst) @@ -199,6 +202,10 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(h.Encode()) b.append(diffKey) + // Continue growing until we have enough space. + for uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { + b.grow() + } b.sz += v.Encode(b.buf[b.sz:]) // Size of KV on SST. @@ -207,13 +214,29 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.tableIndex.EstimatedSize += (sstSz + vpLen) } +// grow increases the size of b.buf by 50%. +func (b *Builder) grow() { + b.bufLock.Lock() + newBuf := make([]byte, len(b.buf)+len(b.buf)/2) + copy(newBuf, b.buf) + b.buf = newBuf + b.bufLock.Unlock() +} func (b *Builder) append(data []byte) { + // Continue growing until we have enough space. + for len(b.buf) < b.sz+len(data) { + b.grow() + } copy(b.buf[b.sz:], data) b.sz += len(data) } func (b *Builder) addPadding(sz int) { b.sz += sz + // Continue growing until we have enough space. + for len(b.buf) < b.sz { + b.grow() + } } /* From 30a4c312258e62f9e2dd1e222b188f0fce3ae247 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 2 Mar 2020 16:13:28 +0530 Subject: [PATCH 18/33] Remove unused key from bblock --- table/builder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/table/builder.go b/table/builder.go index 0e455d29b..5dd48ee33 100644 --- a/table/builder.go +++ b/table/builder.go @@ -70,7 +70,6 @@ func (h *header) Decode(buf []byte) { type bblock struct { data []byte - key []byte start uint32 // Points to the starting offset of the block. end uint32 // Points to the end offset of the block. } From 833bae46d50e885f28880698f3dd7146123b02cb Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 2 Mar 2020 17:02:47 +0530 Subject: [PATCH 19/33] Fix benchmark --- table/builder_test.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/table/builder_test.go b/table/builder_test.go index 650d16827..5158a159a 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -164,16 +164,15 @@ func BenchmarkBuilder(b *testing.B) { b.SetBytes(int64(keysCount) * (32 + 32)) opt.BlockSize = 4 * 1024 opt.BloomFalsePositive = 0.01 - // TODO: Fix this. The size is dependant on benchtime and keycounts. - opt.TableSize = 5 << 30 - builder := NewTableBuilder(*opt) + opt.TableSize = 5 << 20 b.ResetTimer() for i := 0; i < b.N; i++ { + builder := NewTableBuilder(*opt) for j := 0; j < keysCount; j++ { builder.Add(keyList[j], vs, 0) } + _ = builder.Finish() } - _ = builder.Finish() } b.Run("no compression", func(b *testing.B) { @@ -181,6 +180,13 @@ func BenchmarkBuilder(b *testing.B) { opt.Compression = options.None bench(b, &opt) }) + b.Run("encryption", func(b *testing.B) { + var opt Options + key := make([]byte, 32) + rand.Read(key) + opt.DataKey = &pb.DataKey{Data: key} + bench(b, &opt) + }) b.Run("zstd compression", func(b *testing.B) { var opt Options opt.Compression = options.ZSTD From 7061c81f8af0e344354b82e9041d63dc9cfbf492 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 2 Mar 2020 18:25:15 +0530 Subject: [PATCH 20/33] Fix slicepool usage --- table/builder.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/table/builder.go b/table/builder.go index 5dd48ee33..d76545427 100644 --- a/table/builder.go +++ b/table/builder.go @@ -17,7 +17,6 @@ package table import ( - "bytes" "crypto/aes" "math" "runtime" @@ -40,12 +39,6 @@ const ( MB = KB * 1024 ) -func newBuffer(sz int) *bytes.Buffer { - b := new(bytes.Buffer) - b.Grow(sz) - return b -} - type header struct { overlap uint16 // Overlap with base key. diff uint16 // Length of the diff. @@ -121,22 +114,28 @@ func NewTableBuilder(opts Options) *Builder { return b } -var slicePool = sync.Pool{New: func() interface{} { return make([]byte, 0, 100) }} +var slicePool = sync.Pool{ + New: func() interface{} { + // Make 4 KB blocks for reuse. + b := make([]byte, 0, 4<<10) + return &b + }, +} func (b *Builder) handleBlock() { defer b.wg.Done() for item := range b.blockChan { // Extract the item blockBuf := item.data[item.start:item.end] - var dst []byte + var dst *[]byte // Compress the block. if b.opt.Compression != options.None { var err error - dst = slicePool.Get().([]byte) - dst = dst[:0] + dst = slicePool.Get().(*[]byte) + *dst = (*dst)[:0] - blockBuf, err = b.compressData(dst, blockBuf) + blockBuf, err = b.compressData(*dst, blockBuf) y.Check(err) } if b.shouldEncrypt() { From 2fecb9df3be0e10be49bb6c299bcd7147a4dbe14 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 2 Mar 2020 18:37:38 +0530 Subject: [PATCH 21/33] Fix slicepool usage --- table/builder.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/table/builder.go b/table/builder.go index d76545427..7f9dbbc3f 100644 --- a/table/builder.go +++ b/table/builder.go @@ -149,7 +149,9 @@ func (b *Builder) handleBlock() { copy(b.buf[item.start:], blockBuf) b.bufLock.Unlock() - slicePool.Put(dst) + if dst != nil { + slicePool.Put(dst) + } newend := item.start + uint32(len(blockBuf)) item.end = newend From 80146690598eb70bf7ad990da88e91fecdc5339d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 3 Mar 2020 15:03:08 +0530 Subject: [PATCH 22/33] fix review comments --- table/builder.go | 60 +++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/table/builder.go b/table/builder.go index 7f9dbbc3f..e3e5fe43b 100644 --- a/table/builder.go +++ b/table/builder.go @@ -37,6 +37,10 @@ import ( const ( KB = 1024 MB = KB * 1024 + + // When a block is encrypted, it's length increases. We add 200 bytes of padding to + // handle cases when block size increases. This is an approximate number. + padding = 200 ) type header struct { @@ -72,7 +76,7 @@ type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte sz int - bufLock sync.Mutex + bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -90,9 +94,9 @@ type Builder struct { // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { b := &Builder{ - // Additional 2 MB to store index (approximate). + // Additional 5 MB to store index (approximate). // We trim the additional space in table.Finish(). - buf: make([]byte, opts.TableSize+MB*2), + buf: make([]byte, opts.TableSize+5*MB), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, @@ -125,7 +129,7 @@ var slicePool = sync.Pool{ func (b *Builder) handleBlock() { defer b.wg.Done() for item := range b.blockChan { - // Extract the item + // Extract the block. blockBuf := item.data[item.start:item.end] var dst *[]byte // Compress the block. @@ -144,17 +148,27 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } + // The newend should always be less than or equal to the original end + // plus the padding. If the new end is greater than item.end+padding + // that means the data from this block cannot be stored in its existing + // location and trying to copy it over would mean we would over-write + // some data of the next block. + y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding, + "newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding) + + // Acquire the buflock here. The builder.grow function might change + // the b.buf while this goroutine was running. b.bufLock.Lock() // Copy over compressed/encrypted data back to the main buffer. copy(b.buf[item.start:], blockBuf) b.bufLock.Unlock() + // Fix the boundary of the block. + item.end = item.start + uint32(len(blockBuf)) + if dst != nil { slicePool.Put(dst) } - - newend := item.start + uint32(len(blockBuf)) - item.end = newend } } @@ -202,9 +216,8 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(h.Encode()) b.append(diffKey) - // Continue growing until we have enough space. - for uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { - b.grow() + if uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { + b.grow(int(v.EncodedSize())) } b.sz += v.Encode(b.buf[b.sz:]) @@ -214,29 +227,31 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.tableIndex.EstimatedSize += (sstSz + vpLen) } -// grow increases the size of b.buf by 50%. -func (b *Builder) grow() { +// grow increases the size of b.buf by atleast 50%. +func (b *Builder) grow(n int) { + if n < len(b.buf)/2 { + n = len(b.buf) / 2 + } b.bufLock.Lock() - newBuf := make([]byte, len(b.buf)+len(b.buf)/2) + newBuf := make([]byte, len(b.buf)+n) copy(newBuf, b.buf) b.buf = newBuf b.bufLock.Unlock() } func (b *Builder) append(data []byte) { - // Continue growing until we have enough space. - for len(b.buf) < b.sz+len(data) { - b.grow() + // Ensure we have enough space to store new data. + if len(b.buf) < b.sz+len(data) { + b.grow(len(data)) } copy(b.buf[b.sz:], data) b.sz += len(data) } func (b *Builder) addPadding(sz int) { - b.sz += sz - // Continue growing until we have enough space. - for len(b.buf) < b.sz { - b.grow() + if len(b.buf) < b.sz+sz { + b.grow(sz) } + b.sz += sz } /* @@ -264,9 +279,6 @@ func (b *Builder) finishBlock() { return } - // When a block is encrypted, it's length increases. We add 200 bytes to padding to - // handle cases when block size increases. - padding := 200 b.addPadding(padding) // Block end is the actual end of the block ignoring the padding. @@ -379,7 +391,7 @@ func (b *Builder) Finish() []byte { if len(b.blockList) > 0 { start := uint32(0) for i, bl := range b.blockList { - // Length of the block is start minues the end. + // Length of the block is start minus the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start b.tableIndex.Offsets[i].Offset = start From 95dffd2d484771eacc39f5c24f34ee0428e9e2e2 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 3 Mar 2020 15:36:35 +0530 Subject: [PATCH 23/33] Use dst to store final table --- table/builder.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/table/builder.go b/table/builder.go index e3e5fe43b..a09cc7ada 100644 --- a/table/builder.go +++ b/table/builder.go @@ -386,21 +386,21 @@ func (b *Builder) Finish() []byte { // Wait for block handler to finish. b.wg.Wait() + dst := b.buf // Fix block boundaries. This includes moving the blocks so that we // don't have any interleaving space between them. if len(b.blockList) > 0 { - start := uint32(0) + // This will not allocate new memory. The underlying array has enough + // space to store the complete b.buf. + dst = dst[:0] for i, bl := range b.blockList { // Length of the block is start minus the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start - b.tableIndex.Offsets[i].Offset = start + b.tableIndex.Offsets[i].Offset = uint32(len(dst)) - // Copy over the block to the current position in the main buffer. - copy(b.buf[start:], b.buf[bl.start:bl.end]) - start = b.tableIndex.Offsets[i].Offset + b.tableIndex.Offsets[i].Len + // Append next block. + dst = append(dst, b.buf[bl.start:bl.end]...) } - // Start writing to the buffer from the point until which we have valid data - b.sz = int(start) } index, err := proto.Marshal(b.tableIndex) @@ -410,12 +410,23 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } - // Write index the buffer. - b.append(index) - b.append(y.U32ToBytes(uint32(len(index)))) + // Write index to the buffer. + dst = append(dst, index...) + dst = append(dst, y.U32ToBytes(uint32(len(index)))...) - b.writeChecksum(index) - return b.buf[:b.sz] + // Build checksum for the index. + checksum := pb.Checksum{ + Sum: y.CalculateChecksum(index, pb.Checksum_CRC32C), + Algo: pb.Checksum_CRC32C, + } + chksum, err := proto.Marshal(&checksum) + y.Check(err) + // Write checksum to the buffer. + dst = append(dst, chksum...) + + // Write checksum size. + dst = append(dst, y.U32ToBytes(uint32(len(chksum)))...) + return dst } func (b *Builder) writeChecksum(data []byte) { From 0a506f06b4d4774d54b25a199c2fae00ee71f71b Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 3 Mar 2020 15:52:10 +0530 Subject: [PATCH 24/33] Reduce size of blockchan --- table/builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index a09cc7ada..07b7bc627 100644 --- a/table/builder.go +++ b/table/builder.go @@ -108,9 +108,9 @@ func NewTableBuilder(opts Options) *Builder { return b } - b.blockChan = make(chan *bblock, 1000) + count := 2 * runtime.NumCPU() + b.blockChan = make(chan *bblock, count*2) - count := runtime.NumCPU() b.wg.Add(count) for i := 0; i < count; i++ { go b.handleBlock() From f869accd3f15dc8b61a2106af11643f839b93c8d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 3 Mar 2020 17:07:44 +0530 Subject: [PATCH 25/33] Disable all stream writer tests --- stream_writer_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/stream_writer_test.go b/stream_writer_test.go index 1a1c2be1f..eab1afa82 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -51,6 +51,7 @@ func getSortedKVList(valueSize, listSize int) *pb.KVList { // check if we can read values after writing using stream writer func TestStreamWriter1(t *testing.T) { + t.Skip() test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer @@ -102,6 +103,7 @@ func TestStreamWriter1(t *testing.T) { // write more keys to db after writing keys using stream writer func TestStreamWriter2(t *testing.T) { + t.Skip() test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer @@ -164,6 +166,7 @@ func TestStreamWriter2(t *testing.T) { } func TestStreamWriter3(t *testing.T) { + t.Skip() test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer @@ -259,6 +262,7 @@ func TestStreamWriter3(t *testing.T) { // Oracle reinitialization is happening. Try commenting line 171 in stream_writer.go with code // (sw.db.orc = newOracle(sw.db.opt), this test should fail. func TestStreamWriter4(t *testing.T) { + t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { // first insert some entries in db for i := 0; i < 10; i++ { @@ -285,6 +289,7 @@ func TestStreamWriter4(t *testing.T) { } func TestStreamWriter5(t *testing.T) { + t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { list := &pb.KVList{} @@ -323,6 +328,7 @@ func TestStreamWriter5(t *testing.T) { // This test tries to insert multiple equal keys(without version) and verifies // if those are going to same table. func TestStreamWriter6(t *testing.T) { + t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { list := &pb.KVList{} str := []string{"a", "a", "b", "b", "c", "c"} @@ -362,6 +368,7 @@ func TestStreamWriter6(t *testing.T) { } func TestStreamDone(t *testing.T) { + t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { sw := db.NewStreamWriter() require.NoError(t, sw.Prepare(), "sw.Prepare() failed") @@ -394,6 +401,7 @@ func TestStreamDone(t *testing.T) { } func TestSendOnClosedStream(t *testing.T) { + t.Skip() dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) defer func() { @@ -441,6 +449,7 @@ func TestSendOnClosedStream(t *testing.T) { } func TestSendOnClosedStream2(t *testing.T) { + t.Skip() dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) defer func() { @@ -485,6 +494,7 @@ func TestSendOnClosedStream2(t *testing.T) { } func TestStreamWriterEncrypted(t *testing.T) { + t.Skip() dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) From 34af954bd75c3000cbd7412b7b3d026bd59ac911 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 4 Mar 2020 14:52:58 +0530 Subject: [PATCH 26/33] Revert "Disable all stream writer tests" This reverts commit f869accd3f15dc8b61a2106af11643f839b93c8d. --- stream_writer_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/stream_writer_test.go b/stream_writer_test.go index eab1afa82..1a1c2be1f 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -51,7 +51,6 @@ func getSortedKVList(valueSize, listSize int) *pb.KVList { // check if we can read values after writing using stream writer func TestStreamWriter1(t *testing.T) { - t.Skip() test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer @@ -103,7 +102,6 @@ func TestStreamWriter1(t *testing.T) { // write more keys to db after writing keys using stream writer func TestStreamWriter2(t *testing.T) { - t.Skip() test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer @@ -166,7 +164,6 @@ func TestStreamWriter2(t *testing.T) { } func TestStreamWriter3(t *testing.T) { - t.Skip() test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer @@ -262,7 +259,6 @@ func TestStreamWriter3(t *testing.T) { // Oracle reinitialization is happening. Try commenting line 171 in stream_writer.go with code // (sw.db.orc = newOracle(sw.db.opt), this test should fail. func TestStreamWriter4(t *testing.T) { - t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { // first insert some entries in db for i := 0; i < 10; i++ { @@ -289,7 +285,6 @@ func TestStreamWriter4(t *testing.T) { } func TestStreamWriter5(t *testing.T) { - t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { list := &pb.KVList{} @@ -328,7 +323,6 @@ func TestStreamWriter5(t *testing.T) { // This test tries to insert multiple equal keys(without version) and verifies // if those are going to same table. func TestStreamWriter6(t *testing.T) { - t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { list := &pb.KVList{} str := []string{"a", "a", "b", "b", "c", "c"} @@ -368,7 +362,6 @@ func TestStreamWriter6(t *testing.T) { } func TestStreamDone(t *testing.T) { - t.Skip() runBadgerTest(t, nil, func(t *testing.T, db *DB) { sw := db.NewStreamWriter() require.NoError(t, sw.Prepare(), "sw.Prepare() failed") @@ -401,7 +394,6 @@ func TestStreamDone(t *testing.T) { } func TestSendOnClosedStream(t *testing.T) { - t.Skip() dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) defer func() { @@ -449,7 +441,6 @@ func TestSendOnClosedStream(t *testing.T) { } func TestSendOnClosedStream2(t *testing.T) { - t.Skip() dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) defer func() { @@ -494,7 +485,6 @@ func TestSendOnClosedStream2(t *testing.T) { } func TestStreamWriterEncrypted(t *testing.T) { - t.Skip() dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) From ac6d2c0a79720c453a6d393068af66e0bdcc4e76 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 4 Mar 2020 19:38:07 +0530 Subject: [PATCH 27/33] Revert "fix review comments" This reverts commit 80146690598eb70bf7ad990da88e91fecdc5339d. Attempt to fix the build. --- table/builder.go | 60 +++++++++++++++++++----------------------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/table/builder.go b/table/builder.go index 07b7bc627..45bd910cd 100644 --- a/table/builder.go +++ b/table/builder.go @@ -37,10 +37,6 @@ import ( const ( KB = 1024 MB = KB * 1024 - - // When a block is encrypted, it's length increases. We add 200 bytes of padding to - // handle cases when block size increases. This is an approximate number. - padding = 200 ) type header struct { @@ -76,7 +72,7 @@ type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte sz int - bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. + bufLock sync.Mutex baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -94,9 +90,9 @@ type Builder struct { // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { b := &Builder{ - // Additional 5 MB to store index (approximate). + // Additional 2 MB to store index (approximate). // We trim the additional space in table.Finish(). - buf: make([]byte, opts.TableSize+5*MB), + buf: make([]byte, opts.TableSize+MB*2), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, @@ -129,7 +125,7 @@ var slicePool = sync.Pool{ func (b *Builder) handleBlock() { defer b.wg.Done() for item := range b.blockChan { - // Extract the block. + // Extract the item blockBuf := item.data[item.start:item.end] var dst *[]byte // Compress the block. @@ -148,27 +144,17 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } - // The newend should always be less than or equal to the original end - // plus the padding. If the new end is greater than item.end+padding - // that means the data from this block cannot be stored in its existing - // location and trying to copy it over would mean we would over-write - // some data of the next block. - y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding, - "newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding) - - // Acquire the buflock here. The builder.grow function might change - // the b.buf while this goroutine was running. b.bufLock.Lock() // Copy over compressed/encrypted data back to the main buffer. copy(b.buf[item.start:], blockBuf) b.bufLock.Unlock() - // Fix the boundary of the block. - item.end = item.start + uint32(len(blockBuf)) - if dst != nil { slicePool.Put(dst) } + + newend := item.start + uint32(len(blockBuf)) + item.end = newend } } @@ -216,8 +202,9 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(h.Encode()) b.append(diffKey) - if uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { - b.grow(int(v.EncodedSize())) + // Continue growing until we have enough space. + for uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { + b.grow() } b.sz += v.Encode(b.buf[b.sz:]) @@ -227,31 +214,29 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.tableIndex.EstimatedSize += (sstSz + vpLen) } -// grow increases the size of b.buf by atleast 50%. -func (b *Builder) grow(n int) { - if n < len(b.buf)/2 { - n = len(b.buf) / 2 - } +// grow increases the size of b.buf by 50%. +func (b *Builder) grow() { b.bufLock.Lock() - newBuf := make([]byte, len(b.buf)+n) + newBuf := make([]byte, len(b.buf)+len(b.buf)/2) copy(newBuf, b.buf) b.buf = newBuf b.bufLock.Unlock() } func (b *Builder) append(data []byte) { - // Ensure we have enough space to store new data. - if len(b.buf) < b.sz+len(data) { - b.grow(len(data)) + // Continue growing until we have enough space. + for len(b.buf) < b.sz+len(data) { + b.grow() } copy(b.buf[b.sz:], data) b.sz += len(data) } func (b *Builder) addPadding(sz int) { - if len(b.buf) < b.sz+sz { - b.grow(sz) - } b.sz += sz + // Continue growing until we have enough space. + for len(b.buf) < b.sz { + b.grow() + } } /* @@ -279,6 +264,9 @@ func (b *Builder) finishBlock() { return } + // When a block is encrypted, it's length increases. We add 200 bytes to padding to + // handle cases when block size increases. + padding := 200 b.addPadding(padding) // Block end is the actual end of the block ignoring the padding. @@ -394,7 +382,7 @@ func (b *Builder) Finish() []byte { // space to store the complete b.buf. dst = dst[:0] for i, bl := range b.blockList { - // Length of the block is start minus the end. + // Length of the block is start minues the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start b.tableIndex.Offsets[i].Offset = uint32(len(dst)) From 19d9717098ab7bf370cb5dd60ee1d498a686fb6d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 5 Mar 2020 16:35:43 +0530 Subject: [PATCH 28/33] Revert "Reduce size of blockchan" This reverts commit 0a506f06b4d4774d54b25a199c2fae00ee71f71b. --- table/builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index 45bd910cd..2e3756b7b 100644 --- a/table/builder.go +++ b/table/builder.go @@ -104,9 +104,9 @@ func NewTableBuilder(opts Options) *Builder { return b } - count := 2 * runtime.NumCPU() - b.blockChan = make(chan *bblock, count*2) + b.blockChan = make(chan *bblock, 1000) + count := runtime.NumCPU() b.wg.Add(count) for i := 0; i < count; i++ { go b.handleBlock() From d1b850fbfe8d82b29ab5ceffa10579bcbd28bc4e Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 5 Mar 2020 16:54:47 +0530 Subject: [PATCH 29/33] Revert "Use dst to store final table" This reverts commit 95dffd2d484771eacc39f5c24f34ee0428e9e2e2. Attempt to fix the build. --- table/builder.go | 35 ++++++++++++----------------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/table/builder.go b/table/builder.go index 2e3756b7b..7f9dbbc3f 100644 --- a/table/builder.go +++ b/table/builder.go @@ -374,21 +374,21 @@ func (b *Builder) Finish() []byte { // Wait for block handler to finish. b.wg.Wait() - dst := b.buf // Fix block boundaries. This includes moving the blocks so that we // don't have any interleaving space between them. if len(b.blockList) > 0 { - // This will not allocate new memory. The underlying array has enough - // space to store the complete b.buf. - dst = dst[:0] + start := uint32(0) for i, bl := range b.blockList { // Length of the block is start minues the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start - b.tableIndex.Offsets[i].Offset = uint32(len(dst)) + b.tableIndex.Offsets[i].Offset = start - // Append next block. - dst = append(dst, b.buf[bl.start:bl.end]...) + // Copy over the block to the current position in the main buffer. + copy(b.buf[start:], b.buf[bl.start:bl.end]) + start = b.tableIndex.Offsets[i].Offset + b.tableIndex.Offsets[i].Len } + // Start writing to the buffer from the point until which we have valid data + b.sz = int(start) } index, err := proto.Marshal(b.tableIndex) @@ -398,23 +398,12 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } - // Write index to the buffer. - dst = append(dst, index...) - dst = append(dst, y.U32ToBytes(uint32(len(index)))...) + // Write index the buffer. + b.append(index) + b.append(y.U32ToBytes(uint32(len(index)))) - // Build checksum for the index. - checksum := pb.Checksum{ - Sum: y.CalculateChecksum(index, pb.Checksum_CRC32C), - Algo: pb.Checksum_CRC32C, - } - chksum, err := proto.Marshal(&checksum) - y.Check(err) - // Write checksum to the buffer. - dst = append(dst, chksum...) - - // Write checksum size. - dst = append(dst, y.U32ToBytes(uint32(len(chksum)))...) - return dst + b.writeChecksum(index) + return b.buf[:b.sz] } func (b *Builder) writeChecksum(data []byte) { From c6cd35fea3468dbd1891e045f677fe92e475f5ba Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 5 Mar 2020 17:40:57 +0530 Subject: [PATCH 30/33] Revert "Revert "Reduce size of blockchan"" This reverts commit 19d9717098ab7bf370cb5dd60ee1d498a686fb6d. --- table/builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/builder.go b/table/builder.go index 7f9dbbc3f..9424536f2 100644 --- a/table/builder.go +++ b/table/builder.go @@ -104,9 +104,9 @@ func NewTableBuilder(opts Options) *Builder { return b } - b.blockChan = make(chan *bblock, 1000) + count := 2 * runtime.NumCPU() + b.blockChan = make(chan *bblock, count*2) - count := runtime.NumCPU() b.wg.Add(count) for i := 0; i < count; i++ { go b.handleBlock() From 10e0c2b858c7324e0aca17a20cdc3b48569b087b Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 5 Mar 2020 20:24:47 +0530 Subject: [PATCH 31/33] Revert "Revert "fix review comments"" This reverts commit ac6d2c0a79720c453a6d393068af66e0bdcc4e76. --- table/builder.go | 60 +++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/table/builder.go b/table/builder.go index 9424536f2..eb79e3982 100644 --- a/table/builder.go +++ b/table/builder.go @@ -37,6 +37,10 @@ import ( const ( KB = 1024 MB = KB * 1024 + + // When a block is encrypted, it's length increases. We add 200 bytes of padding to + // handle cases when block size increases. This is an approximate number. + padding = 200 ) type header struct { @@ -72,7 +76,7 @@ type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte sz int - bufLock sync.Mutex + bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -90,9 +94,9 @@ type Builder struct { // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { b := &Builder{ - // Additional 2 MB to store index (approximate). + // Additional 5 MB to store index (approximate). // We trim the additional space in table.Finish(). - buf: make([]byte, opts.TableSize+MB*2), + buf: make([]byte, opts.TableSize+5*MB), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, @@ -125,7 +129,7 @@ var slicePool = sync.Pool{ func (b *Builder) handleBlock() { defer b.wg.Done() for item := range b.blockChan { - // Extract the item + // Extract the block. blockBuf := item.data[item.start:item.end] var dst *[]byte // Compress the block. @@ -144,17 +148,27 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } + // The newend should always be less than or equal to the original end + // plus the padding. If the new end is greater than item.end+padding + // that means the data from this block cannot be stored in its existing + // location and trying to copy it over would mean we would over-write + // some data of the next block. + y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding, + "newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding) + + // Acquire the buflock here. The builder.grow function might change + // the b.buf while this goroutine was running. b.bufLock.Lock() // Copy over compressed/encrypted data back to the main buffer. copy(b.buf[item.start:], blockBuf) b.bufLock.Unlock() + // Fix the boundary of the block. + item.end = item.start + uint32(len(blockBuf)) + if dst != nil { slicePool.Put(dst) } - - newend := item.start + uint32(len(blockBuf)) - item.end = newend } } @@ -202,9 +216,8 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.append(h.Encode()) b.append(diffKey) - // Continue growing until we have enough space. - for uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { - b.grow() + if uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { + b.grow(int(v.EncodedSize())) } b.sz += v.Encode(b.buf[b.sz:]) @@ -214,29 +227,31 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { b.tableIndex.EstimatedSize += (sstSz + vpLen) } -// grow increases the size of b.buf by 50%. -func (b *Builder) grow() { +// grow increases the size of b.buf by atleast 50%. +func (b *Builder) grow(n int) { + if n < len(b.buf)/2 { + n = len(b.buf) / 2 + } b.bufLock.Lock() - newBuf := make([]byte, len(b.buf)+len(b.buf)/2) + newBuf := make([]byte, len(b.buf)+n) copy(newBuf, b.buf) b.buf = newBuf b.bufLock.Unlock() } func (b *Builder) append(data []byte) { - // Continue growing until we have enough space. - for len(b.buf) < b.sz+len(data) { - b.grow() + // Ensure we have enough space to store new data. + if len(b.buf) < b.sz+len(data) { + b.grow(len(data)) } copy(b.buf[b.sz:], data) b.sz += len(data) } func (b *Builder) addPadding(sz int) { - b.sz += sz - // Continue growing until we have enough space. - for len(b.buf) < b.sz { - b.grow() + if len(b.buf) < b.sz+sz { + b.grow(sz) } + b.sz += sz } /* @@ -264,9 +279,6 @@ func (b *Builder) finishBlock() { return } - // When a block is encrypted, it's length increases. We add 200 bytes to padding to - // handle cases when block size increases. - padding := 200 b.addPadding(padding) // Block end is the actual end of the block ignoring the padding. @@ -379,7 +391,7 @@ func (b *Builder) Finish() []byte { if len(b.blockList) > 0 { start := uint32(0) for i, bl := range b.blockList { - // Length of the block is start minues the end. + // Length of the block is start minus the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start b.tableIndex.Offsets[i].Offset = start From 6f2a0fc6ceee70a34aff0c5aa2491a133b50715a Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 6 Mar 2020 15:18:26 +0530 Subject: [PATCH 32/33] Use dst instead of b.buf --- table/builder.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/table/builder.go b/table/builder.go index eb79e3982..a04393efd 100644 --- a/table/builder.go +++ b/table/builder.go @@ -386,21 +386,22 @@ func (b *Builder) Finish() []byte { // Wait for block handler to finish. b.wg.Wait() + dst := b.buf[:0] // Fix block boundaries. This includes moving the blocks so that we // don't have any interleaving space between them. if len(b.blockList) > 0 { - start := uint32(0) for i, bl := range b.blockList { // Length of the block is start minus the end. b.tableIndex.Offsets[i].Len = bl.end - bl.start - b.tableIndex.Offsets[i].Offset = start + // New offset of the block is the point in the main buffer till + // which we have written data. + b.tableIndex.Offsets[i].Offset = uint32(len(dst)) - // Copy over the block to the current position in the main buffer. - copy(b.buf[start:], b.buf[bl.start:bl.end]) - start = b.tableIndex.Offsets[i].Offset + b.tableIndex.Offsets[i].Len + dst = append(dst, b.buf[bl.start:bl.end]...) } - // Start writing to the buffer from the point until which we have valid data - b.sz = int(start) + // Start writing to the buffer from the point until which we have valid data. + // Fix the length because append and writeChecksum also rely on it. + b.sz = len(dst) } index, err := proto.Marshal(b.tableIndex) From 1b4a04f5e28a5a5807919b2b093d5e2493b3736f Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 6 Mar 2020 15:53:04 +0530 Subject: [PATCH 33/33] Cleanup --- table/builder.go | 50 +++++++++++++++++++++++++++--------------------- y/iterator.go | 4 ++-- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/table/builder.go b/table/builder.go index a04393efd..3ac1d77d5 100644 --- a/table/builder.go +++ b/table/builder.go @@ -75,7 +75,7 @@ type bblock struct { type Builder struct { // Typically tens or hundreds of meg. This is for one single file. buf []byte - sz int + sz uint32 bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. baseKey []byte // Base key for the current block. @@ -209,15 +209,15 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // store current entry's offset - y.AssertTrue(uint32(b.sz) < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, uint32(b.sz)-b.baseOffset) + y.AssertTrue(b.sz < math.MaxUint32) + b.entryOffsets = append(b.entryOffsets, b.sz-b.baseOffset) // Layout: header, diffKey, value. b.append(h.Encode()) b.append(diffKey) - if uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() { - b.grow(int(v.EncodedSize())) + if uint32(len(b.buf)) < b.sz+v.EncodedSize() { + b.grow(v.EncodedSize()) } b.sz += v.Encode(b.buf[b.sz:]) @@ -228,27 +228,28 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // grow increases the size of b.buf by atleast 50%. -func (b *Builder) grow(n int) { - if n < len(b.buf)/2 { - n = len(b.buf) / 2 +func (b *Builder) grow(n uint32) { + l := uint32(len(b.buf)) + if n < l/2 { + n = l / 2 } b.bufLock.Lock() - newBuf := make([]byte, len(b.buf)+n) + newBuf := make([]byte, l+n) copy(newBuf, b.buf) b.buf = newBuf b.bufLock.Unlock() } func (b *Builder) append(data []byte) { // Ensure we have enough space to store new data. - if len(b.buf) < b.sz+len(data) { - b.grow(len(data)) + if uint32(len(b.buf)) < b.sz+uint32(len(data)) { + b.grow(uint32(len(data))) } copy(b.buf[b.sz:], data) - b.sz += len(data) + b.sz += uint32(len(data)) } -func (b *Builder) addPadding(sz int) { - if len(b.buf) < b.sz+sz { +func (b *Builder) addPadding(sz uint32) { + if uint32(len(b.buf)) < b.sz+sz { b.grow(sz) } b.sz += sz @@ -347,13 +348,13 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { blocksSize := b.sz + // length of current buffer - len(b.entryOffsets)*4 + // all entry offsets size + uint32(len(b.entryOffsets)*4) + // all entry offsets size 4 + // count of all entry offsets 8 + // checksum bytes 4 // checksum length estimateSz := blocksSize + 4 + // Index length - 5*(len(b.tableIndex.Offsets)) // approximate index size + 5*(uint32(len(b.tableIndex.Offsets))) // approximate index size return int64(estimateSz) > cap } @@ -386,22 +387,27 @@ func (b *Builder) Finish() []byte { // Wait for block handler to finish. b.wg.Wait() - dst := b.buf[:0] + dst := b.buf // Fix block boundaries. This includes moving the blocks so that we // don't have any interleaving space between them. if len(b.blockList) > 0 { + dstLen := uint32(0) for i, bl := range b.blockList { - // Length of the block is start minus the end. - b.tableIndex.Offsets[i].Len = bl.end - bl.start + off := b.tableIndex.Offsets[i] + // Length of the block is end minus the start. + off.Len = bl.end - bl.start // New offset of the block is the point in the main buffer till // which we have written data. - b.tableIndex.Offsets[i].Offset = uint32(len(dst)) + off.Offset = dstLen - dst = append(dst, b.buf[bl.start:bl.end]...) + copy(dst[dstLen:], b.buf[bl.start:bl.end]) + + // New length is the start of the block plus its length. + dstLen = off.Offset + off.Len } // Start writing to the buffer from the point until which we have valid data. // Fix the length because append and writeChecksum also rely on it. - b.sz = len(dst) + b.sz = dstLen } index, err := proto.Marshal(b.tableIndex) diff --git a/y/iterator.go b/y/iterator.go index 31e6021e7..7c9b21194 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -64,12 +64,12 @@ func (v *ValueStruct) Decode(b []byte) { } // Encode expects a slice of length at least v.EncodedSize(). -func (v *ValueStruct) Encode(b []byte) int { +func (v *ValueStruct) Encode(b []byte) uint32 { b[0] = v.Meta b[1] = v.UserMeta sz := binary.PutUvarint(b[2:], v.ExpiresAt) n := copy(b[2+sz:], v.Value) - return 2 + sz + n + return uint32(2 + sz + n) } // EncodeTo should be kept in sync with the Encode function above. The reason