diff --git a/badger/cmd/bank.go b/badger/cmd/bank.go index 5491eebd6..a312ca9e4 100644 --- a/badger/cmd/bank.go +++ b/badger/cmd/bank.go @@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte { } func getBalance(txn *badger.Txn, account int) (uint64, error) { - item, err := txn.Get(key(account)) + item, err := get(txn, key(account)) if err != nil { return 0, err } @@ -197,6 +197,25 @@ func diff(a, b []account) string { var errFailure = errors.New("test failed due to balance mismatch") +// get function will fetch the value for the key "k" either by using the +// txn.Get API or the iterator.Seek API. +func get(txn *badger.Txn, k []byte) (*badger.Item, error) { + if rand.Int()%2 == 0 { + return txn.Get(k) + } + + iopt := badger.DefaultIteratorOptions + // PrefectValues is expensive. We don't need it here. + iopt.PrefetchValues = false + it := txn.NewIterator(iopt) + defer it.Close() + it.Seek(k) + if it.Valid() { + return it.Item(), nil + } + return nil, badger.ErrKeyNotFound +} + // seekTotal retrives the total of all accounts by seeking for each account key. func seekTotal(txn *badger.Txn) ([]account, error) { expected := uint64(numAccounts) * uint64(initialBal) @@ -204,7 +223,7 @@ func seekTotal(txn *badger.Txn) ([]account, error) { var total uint64 for i := 0; i < numAccounts; i++ { - item, err := txn.Get(key(i)) + item, err := get(txn, key(i)) if err != nil { log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i)) return accounts, err @@ -343,7 +362,11 @@ func runTest(cmd *cobra.Command, args []string) error { WithNumMemtables(2). // Do not GC any versions, because we need them for the disect.. WithNumVersionsToKeep(int(math.MaxInt32)). - WithValueThreshold(1) // Make all values go to value log + WithValueThreshold(1). // Make all values go to value log + WithCompression(options.ZSTD). + WithKeepL0InMemory(false). + WithMaxCacheSize(10 << 20) + if mmap { opts = opts.WithTableLoadingMode(options.MemoryMap) } diff --git a/badger/cmd/write_bench.go b/badger/cmd/write_bench.go index 26e62e8f5..4d4bfc064 100644 --- a/badger/cmd/write_bench.go +++ b/badger/cmd/write_bench.go @@ -347,7 +347,6 @@ func reportStats(c *y.Closer, db *badger.DB) { if showKeysCount { showKeysStats(db) } - // fetch directory contents if showDir { err := filepath.Walk(sstDir, func(path string, info os.FileInfo, err error) error { diff --git a/db.go b/db.go index ff811a1a4..710eb8527 100644 --- a/db.go +++ b/db.go @@ -194,7 +194,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { func Open(opt Options) (db *DB, err error) { // It's okay to have zero compactors which will disable all compactions but // we cannot have just one compactor otherwise we will end up with all data - // one level 2. + // on level 2. if opt.NumCompactors == 1 { return nil, errors.New("Cannot have 1 compactor. Need at least 2") } @@ -324,6 +324,12 @@ func Open(opt Options) (db *DB, err error) { MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), BufferItems: 64, Metrics: true, + OnEvict: func(i *ristretto.Item) { + table.BlockEvictHandler(i.Value) + }, + OnReject: func(i *ristretto.Item) { + table.BlockEvictHandler(i.Value) + }, } db.blockCache, err = ristretto.NewCache(&config) if err != nil { @@ -986,6 +992,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { defer iter.Close() b := table.NewTableBuilder(bopts) defer b.Close() + var vp valuePointer for iter.SeekToFirst(); iter.Valid(); iter.Next() { if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { @@ -997,7 +1004,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { } b.Add(iter.Key(), iter.Value(), vp.Len) } - return b.Finish() + return b.Finish(true) } type flushTask struct { diff --git a/db2_test.go b/db2_test.go index 4c29d6358..18c6f82ff 100644 --- a/db2_test.go +++ b/db2_test.go @@ -547,7 +547,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true) require.NoError(t, err) - _, err = fd.Write(b.Finish()) + _, err = fd.Write(b.Finish(false)) require.NoError(t, err, "unable to write to file") tab, err := table.OpenTable(fd, bopts) @@ -670,16 +670,13 @@ func TestL0GCBug(t *testing.T) { // Simulate a crash by not closing db1 but releasing the locks. if db1.dirLockGuard != nil { require.NoError(t, db1.dirLockGuard.release()) + db1.dirLockGuard = nil } if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) + db1.valueDirGuard = nil } - for _, f := range db1.vlog.filesMap { - require.NoError(t, f.fd.Close()) - } - require.NoError(t, db1.registry.Close()) - require.NoError(t, db1.lc.close()) - require.NoError(t, db1.manifest.close()) + require.NoError(t, db1.Close()) db2, err := Open(opts) require.NoError(t, err) diff --git a/go.mod b/go.mod index 6cb85b77c..ae61b5e4e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de + github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index a4aa207f9..bdc803b74 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= -github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890 h1:/6pLcQq2GNdLPOotXztuLDXYRPraTIzZMPiJW8HzAwg= +github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/key_registry.go b/key_registry.go index db32acd1a..d995c3cc7 100644 --- a/key_registry.go +++ b/key_registry.go @@ -159,7 +159,7 @@ func validRegistry(fp *os.File, encryptionKey []byte) error { } if len(encryptionKey) > 0 { // Decrypting sanity text. - if eSanityText, err = y.XORBlock(eSanityText, encryptionKey, iv); err != nil { + if eSanityText, err = y.XORBlockAllocate(eSanityText, encryptionKey, iv); err != nil { return y.Wrapf(err, "During validRegistry") } } @@ -200,7 +200,7 @@ func (kri *keyRegistryIterator) next() (*pb.DataKey, error) { } if len(kri.encryptionKey) > 0 { // Decrypt the key if the storage key exists. - if dataKey.Data, err = y.XORBlock(dataKey.Data, kri.encryptionKey, dataKey.Iv); err != nil { + if dataKey.Data, err = y.XORBlockAllocate(dataKey.Data, kri.encryptionKey, dataKey.Iv); err != nil { return nil, y.Wrapf(err, "While decrypting datakey in keyRegistryIterator.next") } } @@ -254,7 +254,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error { eSanity := sanityText if len(opt.EncryptionKey) > 0 { var err error - eSanity, err = y.XORBlock(eSanity, opt.EncryptionKey, iv) + eSanity, err = y.XORBlockAllocate(eSanity, opt.EncryptionKey, iv) if err != nil { return y.Wrapf(err, "Error while encrpting sanity text in WriteKeyRegistry") } @@ -395,7 +395,7 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { return nil } var err error - k.Data, err = y.XORBlock(k.Data, storageKey, k.Iv) + k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv) return err } // In memory datakey will be plain text so encrypting before storing to the disk. diff --git a/levels.go b/levels.go index f0af17de4..769cf4c45 100644 --- a/levels.go +++ b/levels.go @@ -657,7 +657,7 @@ nextTable: numKeys, numSkips, time.Since(timeStart)) if builder.Empty() { // Cleanup builder resources: - builder.Finish() + builder.Finish(false) builder.Close() continue } @@ -677,7 +677,7 @@ nextTable: return nil, errors.Wrapf(err, "While opening new table: %d", fileID) } - if _, err := fd.Write(builder.Finish()); err != nil { + if _, err := fd.Write(builder.Finish(false)); err != nil { return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) } tbl, err := table.OpenTable(fd, bopts) @@ -688,7 +688,7 @@ nextTable: var tbl *table.Table var err error if s.kv.opt.InMemory { - tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts) + tbl, err = table.OpenInMemoryTable(builder.Finish(true), fileID, &bopts) } else { tbl, err = build(fileID) } @@ -700,6 +700,9 @@ nextTable: mu.Lock() newTables = append(newTables, tbl) + num := atomic.LoadInt32(&table.NumBlocks) + allocs := float64(atomic.LoadInt64(&y.NumAllocs)) / float64((1 << 20)) + s.kv.opt.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs) mu.Unlock() }(builder) } diff --git a/levels_test.go b/levels_test.go index 0ef4659c4..f8aa65e89 100644 --- a/levels_test.go +++ b/levels_test.go @@ -49,7 +49,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { panic(err) } - if _, err = fd.Write(b.Finish()); err != nil { + if _, err = fd.Write(b.Finish(false)); err != nil { panic(err) } tab, err := table.OpenTable(fd, opts) @@ -740,7 +740,7 @@ func createEmptyTable(db *DB) *table.Table { b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0) // Open table in memory to avoid adding changes to manifest file. - tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts) + tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts) if err != nil { panic(err) } diff --git a/manifest_test.go b/manifest_test.go index 5062b3f1b..c52e719d5 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -155,7 +155,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.Fil UserMeta: 0, }, 0) } - _, err = f.Write(b.Finish()) + _, err = f.Write(b.Finish(false)) require.NoError(t, err, "unable to write to file.") f.Close() f, _ = y.OpenSyncedFile(filename, true) diff --git a/options.go b/options.go index acf442c2b..9fa4ca21e 100644 --- a/options.go +++ b/options.go @@ -122,7 +122,7 @@ func DefaultOptions(path string) Options { Dir: path, ValueDir: path, LevelOneSize: 256 << 20, - LevelSizeMultiplier: 10, + LevelSizeMultiplier: 15, TableLoadingMode: options.MemoryMap, ValueLogLoadingMode: options.MemoryMap, // table.MemoryMap to mmap() the tables. @@ -176,6 +176,7 @@ func DefaultOptions(path string) Options { func buildTableOptions(opt Options) table.Options { return table.Options{ + TableSize: uint64(opt.MaxTableSize), BlockSize: opt.BlockSize, BloomFalsePositive: opt.BloomFalsePositive, LoadBloomsOnOpen: opt.LoadBloomsOnOpen, @@ -230,17 +231,6 @@ func (opt Options) WithValueDir(val string) Options { return opt } -// WithLoggingLevel returns a new Options value with logging level of the -// default logger set to the given value. -// LoggingLevel sets the level of logging. It should be one of DEBUG, INFO, -// WARNING or ERROR levels. -// -// The default value of LoggingLevel is INFO. -func (opt Options) WithLoggingLevel(val loggingLevel) Options { - opt.Logger = defaultLogger(val) - return opt -} - // WithSyncWrites returns a new Options value with SyncWrites set to the given value. // // When SyncWrites is true all writes are synced to disk. Setting this to false would achieve better @@ -318,6 +308,17 @@ func (opt Options) WithLogger(val Logger) Options { return opt } +// WithLoggingLevel returns a new Options value with logging level of the +// default logger set to the given value. +// LoggingLevel sets the level of logging. It should be one of DEBUG, INFO, +// WARNING or ERROR levels. +// +// The default value of LoggingLevel is INFO. +func (opt Options) WithLoggingLevel(val loggingLevel) Options { + opt.Logger = defaultLogger(val) + return opt +} + // WithMaxTableSize returns a new Options value with MaxTableSize set to the given value. // // MaxTableSize sets the maximum size in bytes for each LSM table or file. @@ -335,7 +336,7 @@ func (opt Options) WithMaxTableSize(val int64) Options { // Once a level grows to be larger than this ratio allowed, the compaction process will be // triggered. // -// The default value of LevelSizeMultiplier is 10. +// The default value of LevelSizeMultiplier is 15. func (opt Options) WithLevelSizeMultiplier(val int) Options { opt.LevelSizeMultiplier = val return opt @@ -460,7 +461,7 @@ func (opt Options) WithValueLogMaxEntries(val uint32) Options { // NumCompactors sets the number of compaction workers to run concurrently. // Setting this to zero stops compactions, which could eventually cause writes to block forever. // -// The default value of NumCompactors is 2. One is dedicated just for L0. +// The default value of NumCompactors is 2. One is dedicated just for L0 and L1. func (opt Options) WithNumCompactors(val int) Options { opt.NumCompactors = val return opt diff --git a/stream_writer.go b/stream_writer.go index 3ed865c75..4e3793419 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -377,6 +377,7 @@ func (w *sortedWriter) send(done bool) error { return err } go func(builder *table.Builder) { + defer builder.Close() err := w.createTable(builder) w.throttle.Done(err) }(w.builder) @@ -410,7 +411,7 @@ func (w *sortedWriter) Done() error { } func (w *sortedWriter) createTable(builder *table.Builder) error { - data := builder.Finish() + data := builder.Finish(w.db.opt.InMemory) if len(data) == 0 { return nil } diff --git a/table/builder.go b/table/builder.go index c5f60ae33..b74449805 100644 --- a/table/builder.go +++ b/table/builder.go @@ -17,11 +17,13 @@ package table import ( - "bytes" "crypto/aes" "math" + "runtime" + "sync" "unsafe" + "github.com/DataDog/zstd" "github.com/dgryski/go-farm" "github.com/golang/protobuf/proto" "github.com/golang/snappy" @@ -33,11 +35,14 @@ import ( "github.com/dgraph-io/ristretto/z" ) -func newBuffer(sz int) *bytes.Buffer { - b := new(bytes.Buffer) - b.Grow(sz) - return b -} +const ( + KB = 1024 + MB = KB * 1024 + + // When a block is encrypted, it's length increases. We add 256 bytes of padding to + // handle cases when block size increases. This is an approximate number. + padding = 256 +) type header struct { overlap uint16 // Overlap with base key. @@ -61,10 +66,18 @@ func (h *header) Decode(buf []byte) { copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize]) } +type bblock struct { + data []byte + start uint32 // Points to the starting offset of the block. + end uint32 // Points to the end offset of the block. +} + // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. - buf *bytes.Buffer + buf []byte + sz uint32 + bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -72,23 +85,90 @@ type Builder struct { tableIndex *pb.TableIndex keyHashes []uint64 // Used for building the bloomfilter. opt *Options + + // Used to concurrently compress/encrypt blocks. + wg sync.WaitGroup + blockChan chan *bblock + blockList []*bblock } // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { - return &Builder{ - buf: newBuffer(1 << 20), + b := &Builder{ + // Additional 16 MB to store index (approximate). + // We trim the additional space in table.Finish(). + buf: y.Calloc(int(opts.TableSize + 16*MB)), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, } + + // If encryption or compression is not enabled, do not start compression/encryption goroutines + // and write directly to the buffer. + if b.opt.Compression == options.None && b.opt.DataKey == nil { + return b + } + + count := 2 * runtime.NumCPU() + b.blockChan = make(chan *bblock, count*2) + + b.wg.Add(count) + for i := 0; i < count; i++ { + go b.handleBlock() + } + return b +} + +func (b *Builder) handleBlock() { + defer b.wg.Done() + + doCompress := b.opt.Compression != options.None + for item := range b.blockChan { + // Extract the block. + blockBuf := item.data[item.start:item.end] + // Compress the block. + if doCompress { + var err error + blockBuf, err = b.compressData(blockBuf) + y.Check(err) + } + if b.shouldEncrypt() { + eBlock, err := b.encrypt(blockBuf, doCompress) + y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) + blockBuf = eBlock + } + + // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater + // than allocated space that means the data from this block cannot be stored in its + // existing location and trying to copy it over would mean we would over-write some data + // of the next block. + allocatedSpace := (item.end - item.start) + padding + 1 + y.AssertTruef(uint32(len(blockBuf)) <= allocatedSpace, "newend: %d oldend: %d padding: %d", + item.start+uint32(len(blockBuf)), item.end, padding) + + // Acquire the buflock here. The builder.grow function might change + // the b.buf while this goroutine was running. + b.bufLock.Lock() + // Copy over compressed/encrypted data back to the main buffer. + copy(b.buf[item.start:], blockBuf) + b.bufLock.Unlock() + + // Fix the boundary of the block. + item.end = item.start + uint32(len(blockBuf)) + + if doCompress { + y.Free(blockBuf) + } + } } // Close closes the TableBuilder. -func (b *Builder) Close() {} +func (b *Builder) Close() { + y.Free(b.buf) +} // Empty returns whether it's empty. -func (b *Builder) Empty() bool { return b.buf.Len() == 0 } +func (b *Builder) Empty() bool { return b.sz == 0 } // keyDiff returns a suffix of newKey that is different from b.baseKey. func (b *Builder) keyDiff(newKey []byte) []byte { @@ -124,20 +204,55 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // store current entry's offset - y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset) + y.AssertTrue(b.sz < math.MaxUint32) + b.entryOffsets = append(b.entryOffsets, b.sz-b.baseOffset) // Layout: header, diffKey, value. - b.buf.Write(h.Encode()) - b.buf.Write(diffKey) // We only need to store the key difference. + b.append(h.Encode()) + b.append(diffKey) + + if uint32(len(b.buf)) < b.sz+v.EncodedSize() { + b.grow(v.EncodedSize()) + } + b.sz += v.Encode(b.buf[b.sz:]) - v.EncodeTo(b.buf) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) // Total estimated size = size on SST + size on vlog (length of value pointer). b.tableIndex.EstimatedSize += (sstSz + vpLen) } +// grow increases the size of b.buf by atleast 50%. +func (b *Builder) grow(n uint32) { + l := uint32(len(b.buf)) + if n < l/2 { + n = l / 2 + } + newBuf := y.Calloc(int(l + n)) + y.AssertTrue(uint32(len(newBuf)) == l+n) + + b.bufLock.Lock() + copy(newBuf, b.buf) + y.Free(b.buf) + b.buf = newBuf + b.bufLock.Unlock() +} +func (b *Builder) append(data []byte) { + // Ensure we have enough space to store new data. + if uint32(len(b.buf)) < b.sz+uint32(len(data)) { + b.grow(uint32(len(data))) + } + copy(b.buf[b.sz:], data) + b.sz += uint32(len(data)) +} + +func (b *Builder) addPadding(sz uint32) { + if uint32(len(b.buf)) < b.sz+sz { + b.grow(sz) + } + b.sz += sz +} + /* Structure of Block. +-------------------+---------------------+--------------------+--------------+------------------+ @@ -151,41 +266,36 @@ Structure of Block. */ // In case the data is encrypted, the "IV" is added to the end of the block. func (b *Builder) finishBlock() { - b.buf.Write(y.U32SliceToBytes(b.entryOffsets)) - b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets)))) - - blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. - b.writeChecksum(blockBuf) - - // Compress the block. - if b.opt.Compression != options.None { - var err error - // TODO: Find a way to reuse buffers. Current implementation creates a - // new buffer for each compressData call. - blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:]) - y.Check(err) - // Truncate already written data. - b.buf.Truncate(int(b.baseOffset)) - // Write compressed data. - b.buf.Write(blockBuf) - } - if b.shouldEncrypt() { - block := b.buf.Bytes()[b.baseOffset:] - eBlock, err := b.encrypt(block) - y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) - // We're rewriting the block, after encrypting. - b.buf.Truncate(int(b.baseOffset)) - b.buf.Write(eBlock) + b.append(y.U32SliceToBytes(b.entryOffsets)) + b.append(y.U32ToBytes(uint32(len(b.entryOffsets)))) + + b.writeChecksum(b.buf[b.baseOffset:b.sz]) + + // If compression/encryption is disabled, no need to send the block to the blockChan. + // There's nothing to be done. + if b.blockChan == nil { + b.addBlockToIndex() + return } - // TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can - // implement padding. This might be useful while using direct I/O. + b.addPadding(padding) + + // Block end is the actual end of the block ignoring the padding. + block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf} + b.blockList = append(b.blockList, block) - // Add key to the block index + b.addBlockToIndex() + // Push to the block handler. + b.blockChan <- block +} + +func (b *Builder) addBlockToIndex() { + blockBuf := b.buf[b.baseOffset:b.sz] + // Add key to the block index. bo := &pb.BlockOffset{ Key: y.Copy(b.baseKey), Offset: b.baseOffset, - Len: uint32(b.buf.Len()) - b.baseOffset, + Len: uint32(len(blockBuf)), } b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) } @@ -203,7 +313,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { 4 + // size of list 8 + // Sum64 in checksum proto 4) // checksum length - estimatedSize := uint32(b.buf.Len()) - b.baseOffset + uint32(6 /*header size for entry*/) + + estimatedSize := uint32(b.sz) - b.baseOffset + uint32(6 /*header size for entry*/) + uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize if b.shouldEncrypt() { @@ -211,6 +321,9 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { // So, size of IV is added to estimatedSize. estimatedSize += aes.BlockSize } + // Integer overflow check for table size. + y.AssertTrue(uint64(b.sz)+uint64(estimatedSize) < math.MaxUint32) + return estimatedSize > uint32(b.opt.BlockSize) } @@ -220,8 +333,8 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { b.finishBlock() // Start a new block. Initialize the block. b.baseKey = []byte{} - y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) - b.baseOffset = uint32(b.buf.Len()) + y.AssertTrue(uint32(b.sz) < math.MaxUint32) + b.baseOffset = uint32((b.sz)) b.entryOffsets = b.entryOffsets[:0] } b.addHelper(key, value, uint64(valueLen)) @@ -235,14 +348,14 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - blocksSize := b.buf.Len() + // length of current buffer - len(b.entryOffsets)*4 + // all entry offsets size + blocksSize := b.sz + // length of current buffer + uint32(len(b.entryOffsets)*4) + // all entry offsets size 4 + // count of all entry offsets 8 + // checksum bytes 4 // checksum length estimateSz := blocksSize + 4 + // Index length - 5*(len(b.tableIndex.Offsets)) // approximate index size + 5*(uint32(len(b.tableIndex.Offsets))) // approximate index size return int64(estimateSz) > cap } @@ -259,7 +372,7 @@ The table structure looks like +---------+------------+-----------+---------------+ */ // In case the data is encrypted, the "IV" is added to the end of the index. -func (b *Builder) Finish() []byte { +func (b *Builder) Finish(allocate bool) []byte { if b.opt.BloomFalsePositive > 0 { bf := z.NewBloomFilter(float64(len(b.keyHashes)), b.opt.BloomFalsePositive) for _, h := range b.keyHashes { @@ -271,24 +384,52 @@ func (b *Builder) Finish() []byte { b.finishBlock() // This will never start a new block. + if b.blockChan != nil { + close(b.blockChan) + } + // Wait for block handler to finish. + b.wg.Wait() + + dst := b.buf + // Fix block boundaries. This includes moving the blocks so that we + // don't have any interleaving space between them. + if len(b.blockList) > 0 { + dstLen := uint32(0) + for i, bl := range b.blockList { + off := b.tableIndex.Offsets[i] + // Length of the block is end minus the start. + off.Len = bl.end - bl.start + // New offset of the block is the point in the main buffer till + // which we have written data. + off.Offset = dstLen + + copy(dst[dstLen:], b.buf[bl.start:bl.end]) + + // New length is the start of the block plus its length. + dstLen = off.Offset + off.Len + } + // Start writing to the buffer from the point until which we have valid data. + // Fix the length because append and writeChecksum also rely on it. + b.sz = dstLen + } + index, err := proto.Marshal(b.tableIndex) y.Check(err) if b.shouldEncrypt() { - index, err = b.encrypt(index) + index, err = b.encrypt(index, false) y.Check(err) } - // Write index the file. - n, err := b.buf.Write(index) - y.Check(err) - - y.AssertTrue(uint32(n) < math.MaxUint32) - // Write index size. - _, err = b.buf.Write(y.U32ToBytes(uint32(n))) - y.Check(err) + // Write index the buffer. + b.append(index) + b.append(y.U32ToBytes(uint32(len(index)))) b.writeChecksum(index) - return b.buf.Bytes() + + if allocate { + return append([]byte{}, b.buf[:b.sz]...) + } + return b.buf[:b.sz] } func (b *Builder) writeChecksum(data []byte) { @@ -309,13 +450,10 @@ func (b *Builder) writeChecksum(data []byte) { // Write checksum to the file. chksum, err := proto.Marshal(&checksum) y.Check(err) - n, err := b.buf.Write(chksum) - y.Check(err) + b.append(chksum) - y.AssertTrue(uint32(n) < math.MaxUint32) // Write checksum size. - _, err = b.buf.Write(y.U32ToBytes(uint32(n))) - y.Check(err) + b.append(y.U32ToBytes(uint32(len(chksum)))) } // DataKey returns datakey of the builder. @@ -325,17 +463,32 @@ func (b *Builder) DataKey() *pb.DataKey { // encrypt will encrypt the given data and appends IV to the end of the encrypted data. // This should be only called only after checking shouldEncrypt method. -func (b *Builder) encrypt(data []byte) ([]byte, error) { +func (b *Builder) encrypt(data []byte, viaC bool) ([]byte, error) { iv, err := y.GenerateIV() if err != nil { return data, y.Wrapf(err, "Error while generating IV in Builder.encrypt") } - data, err = y.XORBlock(data, b.DataKey().Data, iv) - if err != nil { + needSz := len(data) + len(iv) + var dst []byte + if viaC { + dst = y.Calloc(needSz) + } else { + dst = make([]byte, needSz) + } + dst = dst[:len(data)] + + if err = y.XORBlock(dst, data, b.DataKey().Data, iv); err != nil { + if viaC { + y.Free(dst) + } return data, y.Wrapf(err, "Error while encrypting in Builder.encrypt") } - data = append(data, iv...) - return data, nil + if viaC { + y.Free(data) + } + + y.AssertTrue(cap(dst)-len(dst) >= len(iv)) + return append(dst, iv...), nil } // shouldEncrypt tells us whether to encrypt the data or not. @@ -350,9 +503,13 @@ func (b *Builder) compressData(data []byte) ([]byte, error) { case options.None: return data, nil case options.Snappy: - return snappy.Encode(nil, data), nil + sz := snappy.MaxEncodedLen(len(data)) + dst := y.Calloc(sz) + return snappy.Encode(dst, data), nil case options.ZSTD: - return y.ZSTDCompress(nil, data, b.opt.ZSTDCompressionLevel) + sz := zstd.CompressBound(len(data)) + dst := y.Calloc(sz) + return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel) } return nil, errors.New("Unsupported compression type") } diff --git a/table/builder_test.go b/table/builder_test.go index 05978bc16..6b80587d8 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -33,30 +33,60 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keyPrefix := "key" - t.Run("single key", func(t *testing.T) { - opts := Options{Compression: options.ZSTD} - f := buildTestTable(t, keyPrefix, 1, opts) - tbl, err := OpenTable(f, opts) - require.NoError(t, err) - require.Len(t, tbl.blockIndex, 1) - }) + keysCount := 100000 + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err) + subTest := []struct { + name string + opts Options + }{ + { + name: "No encyption/compression", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + }, + }, + { + // Encryption mode. + name: "Only encryption", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + DataKey: &pb.DataKey{Data: key}, + }, + }, + { + // Compression mode. + name: "Only compression", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + Compression: options.ZSTD, + ZSTDCompressionLevel: 3, + }, + }, + { + // Compression mode and encryption. + name: "Compression and encryption", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + Compression: options.ZSTD, + ZSTDCompressionLevel: 3, + DataKey: &pb.DataKey{Data: key}, + }, + }, + } - t.Run("multiple keys", func(t *testing.T) { - opts := []Options{} - // Normal mode. - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}) - // Encryption mode. - key := make([]byte, 32) - _, err := rand.Read(key) - require.NoError(t, err) - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - DataKey: &pb.DataKey{Data: key}}) - // Compression mode. - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - Compression: options.ZSTD}) - keysCount := 10000 - for _, opt := range opts { + for _, tt := range subTest { + t.Run(tt.name, func(t *testing.T) { + opt := tt.opts builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) @@ -77,13 +107,14 @@ func TestTableIndex(t *testing.T) { } builder.Add(k, vs, 0) } - _, err = f.Write(builder.Finish()) + _, err = f.Write(builder.Finish(false)) require.NoError(t, err, "unable to write to file") tbl, err := OpenTable(f, opt) require.NoError(t, err, "unable to open table") + if opt.DataKey == nil { - // key id is zero if thre is no datakey. + // key id is zero if there is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } @@ -94,13 +125,13 @@ func TestTableIndex(t *testing.T) { } f.Close() require.NoError(t, os.RemoveAll(filename)) - } - }) + }) + } } func TestInvalidCompression(t *testing.T) { keyPrefix := "key" - opts := Options{Compression: options.ZSTD} + opts := Options{BlockSize: 4 << 10, Compression: options.ZSTD} f := buildTestTable(t, keyPrefix, 1000, opts) t.Run("with correct decompression algo", func(t *testing.T) { _, err := OpenTable(f, opts) @@ -126,19 +157,22 @@ func BenchmarkBuilder(b *testing.B) { keysCount := 1300000 // This number of entries consumes ~64MB of memory. + var keyList [][]byte + for i := 0; i < keysCount; i++ { + keyList = append(keyList, key(i)) + } bench := func(b *testing.B, opt *Options) { - // KeyCount * (keySize + ValSize) b.SetBytes(int64(keysCount) * (32 + 32)) + opt.BlockSize = 4 * 1024 + opt.BloomFalsePositive = 0.01 + opt.TableSize = 5 << 20 + b.ResetTimer() for i := 0; i < b.N; i++ { - opt.BlockSize = 4 * 1024 - opt.BloomFalsePositive = 0.01 builder := NewTableBuilder(*opt) - - for i := 0; i < keysCount; i++ { - builder.Add(key(i), vs, 0) + for j := 0; j < keysCount; j++ { + builder.Add(keyList[j], vs, 0) } - - _ = builder.Finish() + _ = builder.Finish(false) } } @@ -147,6 +181,13 @@ func BenchmarkBuilder(b *testing.B) { opt.Compression = options.None bench(b, &opt) }) + b.Run("encryption", func(b *testing.B) { + var opt Options + key := make([]byte, 32) + rand.Read(key) + opt.DataKey = &pb.DataKey{Data: key} + bench(b, &opt) + }) b.Run("zstd compression", func(b *testing.B) { var opt Options opt.Compression = options.ZSTD diff --git a/table/iterator.go b/table/iterator.go index 8f46fe1b5..d48e58138 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -33,6 +33,7 @@ type blockIterator struct { key []byte val []byte entryOffsets []uint32 + block *block // prevOverlap stores the overlap of the previous key with the base key. // This avoids unnecessary copy of base key when the overlap is same for multiple keys. @@ -40,6 +41,11 @@ type blockIterator struct { } func (itr *blockIterator) setBlock(b *block) { + // Decrement the ref for the old block. If the old block was compressed, we + // might be able to reuse it. + itr.block.decrRef() + + itr.block = b itr.err = nil itr.idx = 0 itr.baseKey = itr.baseKey[:0] @@ -67,6 +73,7 @@ func (itr *blockIterator) setIdx(i int) { baseHeader.Decode(itr.data) itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff] } + var endOffset int // idx points to the last entry in the block. if itr.idx+1 == len(itr.entryOffsets) { @@ -101,7 +108,9 @@ func (itr *blockIterator) Error() error { return itr.err } -func (itr *blockIterator) Close() {} +func (itr *blockIterator) Close() { + itr.block.decrRef() +} var ( origin = 0 @@ -171,6 +180,7 @@ func (t *Table) NewIterator(reversed bool) *Iterator { // Close closes the iterator (and it must be called). func (itr *Iterator) Close() error { + itr.bi.Close() return itr.t.DecrRef() } diff --git a/table/table.go b/table/table.go index e459524f5..46f82d134 100644 --- a/table/table.go +++ b/table/table.go @@ -56,6 +56,9 @@ const sizeOfOffsetStruct int64 = 3*8 + // key array take 3 words type Options struct { // Options for Opening/Building Table. + // Maximum size of the table. + TableSize uint64 + // ChkMode is the checksum verification mode for Table. ChkMode options.ChecksumVerificationMode @@ -182,15 +185,71 @@ func (t *Table) DecrRef() error { return nil } +// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction. +func BlockEvictHandler(value interface{}) { + if b, ok := value.(*block); ok { + b.decrRef() + } +} + type block struct { offset int data []byte checksum []byte - entriesIndexStart int // start index of entryOffsets list - entryOffsets []uint32 - chkLen int // checksum length + entriesIndexStart int // start index of entryOffsets list + entryOffsets []uint32 // used to binary search an entry in the block. + chkLen int // checksum length. + freeMe bool // used to determine if the blocked should be reused. + ref int32 +} + +var NumBlocks int32 + +// incrRef increments the ref of a block and return a bool indicating if the +// increment was successful. A true value indicates that the block can be used. +func (b *block) incrRef() bool { + for { + // We can't blindly add 1 to ref. We need to check whether it has + // reached zero first, because if it did, then we should absolutely not + // use this block. + ref := atomic.LoadInt32(&b.ref) + // The ref would not be equal to 0 unless the existing + // block get evicted before this line. If the ref is zero, it means that + // the block is already added the the blockPool and cannot be used + // anymore. The ref of a new block is 1 so the following condition will + // be true only if the block got reused before we could increment its + // ref. + if ref == 0 { + return false + } + // Increment the ref only if it is not zero and has not changed between + // the time we read it and we're updating it. + // + if atomic.CompareAndSwapInt32(&b.ref, ref, ref+1) { + return true + } + } } +func (b *block) decrRef() { + if b == nil { + return + } + // Insert the []byte into pool only if the block is resuable. When a block + // is reusable a new []byte is used for decompression and this []byte can + // be reused. + // In case of an uncompressed block, the []byte is a reference to the + // table.mmap []byte slice. Any attempt to write data to the mmap []byte + // will lead to SEGFAULT. + if atomic.AddInt32(&b.ref, -1) == 0 { + if b.freeMe { + y.Free(b.data) + } + atomic.AddInt32(&NumBlocks, -1) + // blockPool.Put(&b.data) + } + y.AssertTrue(atomic.LoadInt32(&b.ref) >= 0) +} func (b *block) size() int64 { return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ + cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4) @@ -209,6 +268,11 @@ func (b block) verifyCheckSum() error { // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before // deleting. Checksum for all blocks of table is verified based on value of chkMode. func OpenTable(fd *os.File, opts Options) (*Table, error) { + // BlockSize is used to compute the approximate size of the decompressed + // block. It should not be zero if the table is compressed. + if opts.BlockSize == 0 && opts.Compression != options.None { + return nil, errors.New("Block size cannot be zero") + } fileInfo, err := fd.Stat() if err != nil { // It's OK to ignore fd.Close() errs in this function because we have only read @@ -448,6 +512,9 @@ func calculateOffsetsSize(offsets []*pb.BlockOffset) int64 { return totalSize + 3*8 } +// block function return a new block. Each block holds a ref and the byte +// slice stored in the block will be reused when the ref becomes zero. The +// caller should release the block by calling block.decrRef() on it. func (t *Table) block(idx int) (*block, error) { y.AssertTruef(idx >= 0, "idx=%d", idx) if idx >= t.noOfBlocks { @@ -457,7 +524,12 @@ func (t *Table) block(idx int) (*block, error) { key := t.blockCacheKey(idx) blk, ok := t.opt.Cache.Get(key) if ok && blk != nil { - return blk.(*block), nil + // Use the block only if the increment was successful. The block + // could get evicted from the cache between the Get() call and the + // incrRef() call. + if b := blk.(*block); b.incrRef() { + return b, nil + } } } @@ -465,7 +537,11 @@ func (t *Table) block(idx int) (*block, error) { ko := t.blockOffsets()[idx] blk := &block{ offset: int(ko.Offset), + ref: 1, } + defer blk.decrRef() // Deal with any errors, where blk would not be returned. + atomic.AddInt32(&NumBlocks, 1) + var err error if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil { return nil, errors.Wrapf(err, @@ -479,8 +555,7 @@ func (t *Table) block(idx int) (*block, error) { } } - blk.data, err = t.decompressData(blk.data) - if err != nil { + if err = t.decompress(blk); err != nil { return nil, errors.Wrapf(err, "failed to decode compressed data in file: %s at offset: %d, len: %d", t.fd.Name(), blk.offset, ko.Len) @@ -493,7 +568,7 @@ func (t *Table) block(idx int) (*block, error) { // Checksum length greater than block size could happen if the table was compressed and // it was opened with an incorrect compression algorithm (or the data was corrupted). if blk.chkLen > len(blk.data) { - return nil, errors.New("invalid checksum length. Either the data is" + + return nil, errors.New("invalid checksum length. Either the data is " + "corrupted or the table options are incorrectly set") } @@ -520,9 +595,20 @@ func (t *Table) block(idx int) (*block, error) { return nil, err } } + + blk.incrRef() if t.opt.Cache != nil && t.opt.KeepBlocksInCache { key := t.blockCacheKey(idx) - t.opt.Cache.Set(key, blk, blk.size()) + // incrRef should never return false here because we're calling it on a + // new block with ref=1. + y.AssertTrue(blk.incrRef()) + + // Decrement the block ref if we could not insert it in the cache. + if !t.opt.Cache.Set(key, blk, blk.size()) { + blk.decrRef() + } + // We have added an OnReject func in our cache, which gets called in case the block is not + // admitted to the cache. So, every block would be accounted for. } return blk, nil } @@ -644,7 +730,8 @@ func (t *Table) VerifyChecksum() error { return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d", t.Filename(), i, os.Offset) } - + // We should not call incrRef here, because the block already has one ref when created. + defer b.decrRef() // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum // on block, verification would be done while reading block itself. if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) { @@ -655,7 +742,6 @@ func (t *Table) VerifyChecksum() error { } } } - return nil } @@ -680,7 +766,13 @@ func (t *Table) decrypt(data []byte) ([]byte, error) { iv := data[len(data)-aes.BlockSize:] // Rest all bytes are data. data = data[:len(data)-aes.BlockSize] - return y.XORBlock(data, t.opt.DataKey.Data, iv) + + // TODO: Check if this is done via Calloc. Otherwise, we'll have a memory leak. + dst := make([]byte, len(data)) + if err := y.XORBlock(dst, data, t.opt.DataKey.Data, iv); err != nil { + return nil, errors.Wrapf(err, "while decrypt") + } + return dst, nil } // ParseFileID reads the file id out of a filename. @@ -710,15 +802,42 @@ func NewFilename(id uint64, dir string) string { return filepath.Join(dir, IDToFilename(id)) } -// decompressData decompresses the given data. -func (t *Table) decompressData(data []byte) ([]byte, error) { +// decompress decompresses the data stored in a block. +func (t *Table) decompress(b *block) error { + var dst []byte + var err error + switch t.opt.Compression { case options.None: - return data, nil + // Nothing to be done here. + return nil case options.Snappy: - return snappy.Decode(nil, data) + if sz, err := snappy.DecodedLen(b.data); err == nil { + dst = y.Calloc(sz) + } else { + dst = y.Calloc(len(b.data) * 4) // Take a guess. + } + b.data, err = snappy.Decode(dst, b.data) + if err != nil { + y.Free(dst) + return errors.Wrap(err, "failed to decompress") + } case options.ZSTD: - return y.ZSTDDecompress(nil, data) + sz := int(float64(t.opt.BlockSize) * 1.2) + dst = y.Calloc(sz) + b.data, err = y.ZSTDDecompress(dst, b.data) + if err != nil { + y.Free(dst) + return errors.Wrap(err, "failed to decompress") + } + default: + return errors.New("Unsupported compression type") + } + + if len(b.data) > 0 && len(dst) > 0 && &dst[0] != &b.data[0] { + y.Free(dst) + } else { + b.freeMe = true } - return nil, errors.New("Unsupported compression type") + return nil } diff --git a/table/table_test.go b/table/table_test.go index a31ad1ce9..4b93332fd 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -37,11 +37,6 @@ import ( "github.com/stretchr/testify/require" ) -const ( - KB = 1024 - MB = KB * 1024 -) - func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } @@ -87,7 +82,7 @@ func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { y.AssertTrue(len(kv) == 2) b.Add(y.KeyWithTs([]byte(kv[0]), 0), y.ValueStruct{Value: []byte(kv[1]), Meta: 'A', UserMeta: 0}, 0) } - _, err = f.Write(b.Finish()) + _, err = f.Write(b.Finish(false)) require.NoError(t, err, "writing to file failed") f.Close() f, _ = y.OpenSyncedFile(filename, true) @@ -355,7 +350,7 @@ func TestIterateBackAndForth(t *testing.T) { it.seekToFirst() k = it.Key() - require.EqualValues(t, key("key", 0), y.ParseKey(k)) + require.EqualValues(t, key("key", 0), string(y.ParseKey(k))) } func TestUniIterator(t *testing.T) { @@ -700,7 +695,8 @@ func TestTableBigValues(t *testing.T) { require.NoError(t, err, "unable to create file") n := 100 // Insert 100 keys. - opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + TableSize: uint64(n) * 1 << 20} builder := NewTableBuilder(opts) for i := 0; i < n; i++ { key := y.KeyWithTs([]byte(key("", i)), 0) @@ -708,7 +704,7 @@ func TestTableBigValues(t *testing.T) { builder.Add(key, vs, 0) } - _, err = f.Write(builder.Finish()) + _, err = f.Write(builder.Finish(false)) require.NoError(t, err, "unable to write to file") tbl, err := OpenTable(f, opts) require.NoError(t, err, "unable to open table") @@ -793,7 +789,7 @@ func BenchmarkReadAndBuild(b *testing.B) { vs := it.Value() newBuilder.Add(it.Key(), vs, 0) } - newBuilder.Finish() + newBuilder.Finish(false) }() } } @@ -822,7 +818,7 @@ func BenchmarkReadMerged(b *testing.B) { v := fmt.Sprintf("%d", id) builder.Add([]byte(k), y.ValueStruct{Value: []byte(v), Meta: 123, UserMeta: 0}, 0) } - _, err = f.Write(builder.Finish()) + _, err = f.Write(builder.Finish(false)) require.NoError(b, err, "unable to write to file") tbl, err := OpenTable(f, opts) y.Check(err) @@ -914,7 +910,7 @@ func getTableForBenchmarks(b *testing.B, count int, cache *ristretto.Cache) *Tab builder.Add([]byte(k), y.ValueStruct{Value: []byte(v)}, 0) } - _, err = f.Write(builder.Finish()) + _, err = f.Write(builder.Finish(false)) require.NoError(b, err, "unable to write to file") tbl, err := OpenTable(f, opts) require.NoError(b, err, "unable to open table") diff --git a/value.go b/value.go index cef996a79..fd29f5a7a 100644 --- a/value.go +++ b/value.go @@ -105,13 +105,13 @@ func (lf *logFile) encodeEntry(e *Entry, buf *bytes.Buffer, offset uint32) (int, userMeta: e.UserMeta, } + hash := crc32.New(y.CastagnoliCrcTable) + writer := io.MultiWriter(buf, hash) + // encode header. var headerEnc [maxHeaderSize]byte sz := h.Encode(headerEnc[:]) - y.Check2(buf.Write(headerEnc[:sz])) - // write hash. - hash := crc32.New(y.CastagnoliCrcTable) - y.Check2(hash.Write(headerEnc[:sz])) + y.Check2(writer.Write(headerEnc[:sz])) // we'll encrypt only key and value. if lf.encryptionEnabled() { // TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one @@ -120,25 +120,14 @@ func (lf *logFile) encodeEntry(e *Entry, buf *bytes.Buffer, offset uint32) (int, eBuf := make([]byte, 0, len(e.Key)+len(e.Value)) eBuf = append(eBuf, e.Key...) eBuf = append(eBuf, e.Value...) - var err error - eBuf, err = y.XORBlock(eBuf, lf.dataKey.Data, lf.generateIV(offset)) - if err != nil { + if err := y.XORBlockStream( + writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil { return 0, y.Wrapf(err, "Error while encoding entry for vlog.") } - // write encrypted buf. - y.Check2(buf.Write(eBuf)) - // write the hash. - y.Check2(hash.Write(eBuf)) } else { // Encryption is disabled so writing directly to the buffer. - // write key. - y.Check2(buf.Write(e.Key)) - // write key hash. - y.Check2(hash.Write(e.Key)) - // write value. - y.Check2(buf.Write(e.Value)) - // write value hash. - y.Check2(hash.Write(e.Value)) + y.Check2(writer.Write(e.Key)) + y.Check2(writer.Write(e.Value)) } // write crc32 hash. var crcBuf [crc32.Size]byte @@ -172,7 +161,7 @@ func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) { } func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) { - return y.XORBlock(buf, lf.dataKey.Data, lf.generateIV(offset)) + return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset)) } // KeyID returns datakey's ID. diff --git a/y/calloc.go b/y/calloc.go new file mode 100644 index 000000000..d08f97d84 --- /dev/null +++ b/y/calloc.go @@ -0,0 +1,67 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package y + +// #include +import "C" +import ( + "sync/atomic" + "unsafe" +) + +// The go:linkname directives provides backdoor access to private functions in +// the runtime. Below we're accessing the throw function. + +//go:linkname throw runtime.throw +func throw(s string) + +// TODO(peter): Rather than relying an C malloc/free, we could fork the Go +// runtime page allocator and allocate large chunks of memory using mmap or +// similar. + +var NumAllocs int64 + +// New allocates a slice of size n. The returned slice is from manually managed +// memory and MUST be released by calling Free. Failure to do so will result in +// a memory leak. +func Calloc(n int) []byte { + if n == 0 { + return make([]byte, 0) + } + // We need to be conscious of the Cgo pointer passing rules: + // + // https://golang.org/cmd/cgo/#hdr-Passing_pointers + // + // ... + // Note: the current implementation has a bug. While Go code is permitted + // to write nil or a C pointer (but not a Go pointer) to C memory, the + // current implementation may sometimes cause a runtime error if the + // contents of the C memory appear to be a Go pointer. Therefore, avoid + // passing uninitialized C memory to Go code if the Go code is going to + // store pointer values in it. Zero out the memory in C before passing it + // to Go. + ptr := C.calloc(C.size_t(n), 1) + if ptr == nil { + // NB: throw is like panic, except it guarantees the process will be + // terminated. The call below is exactly what the Go runtime invokes when + // it cannot allocate memory. + throw("out of memory") + } + atomic.AddInt64(&NumAllocs, int64(n)) + // Interpret the C pointer as a pointer to a Go array, then slice. + return (*[MaxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] +} + +// Free frees the specified slice. +func Free(b []byte) { + if sz := cap(b); sz != 0 { + if len(b) == 0 { + b = b[:cap(b)] + } + ptr := unsafe.Pointer(&b[0]) + C.free(ptr) + atomic.AddInt64(&NumAllocs, -int64(sz)) + } +} diff --git a/y/calloc_32bit.go b/y/calloc_32bit.go new file mode 100644 index 000000000..94f17c086 --- /dev/null +++ b/y/calloc_32bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build 386 amd64p32 arm armbe mips mipsle mips64p32 mips64p32le ppc sparc + +package y + +const ( + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<31 - 1 +) diff --git a/y/calloc_64bit.go b/y/calloc_64bit.go new file mode 100644 index 000000000..bf8d774fb --- /dev/null +++ b/y/calloc_64bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build amd64 arm64 arm64be ppc64 ppc64le mips64 mips64le s390x sparc64 + +package y + +const ( + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<50 - 1 +) diff --git a/y/calloc_nocgo.go b/y/calloc_nocgo.go new file mode 100644 index 000000000..03d518b7c --- /dev/null +++ b/y/calloc_nocgo.go @@ -0,0 +1,19 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !cgo + +package y + +// Provides versions of New and Free when cgo is not available (e.g. cross +// compilation). + +// New allocates a slice of size n. +func Calloc(n int) []byte { + return make([]byte, n) +} + +// Free frees the specified slice. +func Free(b []byte) { +} diff --git a/y/calloc_test.go b/y/calloc_test.go new file mode 100644 index 000000000..171422bd0 --- /dev/null +++ b/y/calloc_test.go @@ -0,0 +1,74 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package y + +import ( + "sync" + "testing" + "time" + + "math/rand" +) + +// $ go test -failfast -run xxx -bench . -benchmem -count 10 > out.txt +// $ benchstat out.txt +// name time/op +// Allocation/Pool-8 200µs ± 5% +// Allocation/Calloc-8 100µs ±11% +// +// name alloc/op +// Allocation/Pool-8 477B ±29% +// Allocation/Calloc-8 4.00B ± 0% +// +// name allocs/op +// Allocation/Pool-8 1.00 ± 0% +// Allocation/Calloc-8 0.00 +func BenchmarkAllocation(b *testing.B) { + b.Run("Pool", func(b *testing.B) { + pool := sync.Pool{ + New: func() interface{} { + return make([]byte, 4<<10) + }, + } + b.RunParallel(func(pb *testing.PB) { + source := rand.NewSource(time.Now().UnixNano()) + r := rand.New(source) + for pb.Next() { + x := pool.Get().([]byte) + sz := r.Intn(100) << 10 + if len(x) < sz { + x = make([]byte, sz) + } + r.Read(x) + pool.Put(x) + } + }) + }) + + b.Run("Calloc", func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + source := rand.NewSource(time.Now().UnixNano()) + r := rand.New(source) + for pb.Next() { + sz := r.Intn(100) << 10 + x := Calloc(sz) + r.Read(x) + Free(x) + } + }) + }) +} diff --git a/y/encrypt.go b/y/encrypt.go index dbfe019f1..34b4729b6 100644 --- a/y/encrypt.go +++ b/y/encrypt.go @@ -17,15 +17,29 @@ package y import ( + "bytes" "crypto/aes" "crypto/cipher" "crypto/rand" + "io" + + "github.com/pkg/errors" ) // XORBlock encrypts the given data with AES and XOR's with IV. // Can be used for both encryption and decryption. IV is of // AES block size. -func XORBlock(src, key, iv []byte) ([]byte, error) { +func XORBlock(dst, src, key, iv []byte) error { + block, err := aes.NewCipher(key) + if err != nil { + return err + } + stream := cipher.NewCTR(block, iv) + stream.XORKeyStream(dst, src) + return nil +} + +func XORBlockAllocate(src, key, iv []byte) ([]byte, error) { block, err := aes.NewCipher(key) if err != nil { return nil, err @@ -36,6 +50,17 @@ func XORBlock(src, key, iv []byte) ([]byte, error) { return dst, nil } +func XORBlockStream(w io.Writer, src, key, iv []byte) error { + block, err := aes.NewCipher(key) + if err != nil { + return err + } + stream := cipher.NewCTR(block, iv) + sw := cipher.StreamWriter{S: stream, W: w} + _, err = io.Copy(sw, bytes.NewReader(src)) + return errors.Wrapf(err, "XORBlockStream") +} + // GenerateIV generates IV. func GenerateIV() ([]byte, error) { iv := make([]byte, aes.BlockSize) diff --git a/y/iterator.go b/y/iterator.go index 6d0f677c0..7c9b21194 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -64,11 +64,12 @@ func (v *ValueStruct) Decode(b []byte) { } // Encode expects a slice of length at least v.EncodedSize(). -func (v *ValueStruct) Encode(b []byte) { +func (v *ValueStruct) Encode(b []byte) uint32 { b[0] = v.Meta b[1] = v.UserMeta sz := binary.PutUvarint(b[2:], v.ExpiresAt) - copy(b[2+sz:], v.Value) + n := copy(b[2+sz:], v.Value) + return uint32(2 + sz + n) } // EncodeTo should be kept in sync with the Encode function above. The reason @@ -79,6 +80,7 @@ func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { buf.WriteByte(v.UserMeta) var enc [binary.MaxVarintLen64]byte sz := binary.PutUvarint(enc[:], v.ExpiresAt) + buf.Write(enc[:sz]) buf.Write(v.Value) }