From e7b6e76f96e89740c3464769d1cc34780b61f19e Mon Sep 17 00:00:00 2001 From: balaji Date: Fri, 29 May 2020 02:23:44 +0530 Subject: [PATCH] Use cache for storing block offsets (#1336) Fixes https://github.com/dgraph-io/badger/issues/1335 Signed-off-by: Tiger --- iterator_test.go | 33 +++++++++++- options.go | 61 ++++++++++++++++++++--- table/builder_test.go | 4 +- table/iterator.go | 12 ++--- table/table.go | 113 +++++++++++++++++++++++++++++++++++------- table/table_test.go | 20 ++++++++ test.sh | 4 +- 7 files changed, 209 insertions(+), 38 deletions(-) diff --git a/iterator_test.go b/iterator_test.go index f9fddf913..cbaf6bec1 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -125,7 +125,7 @@ func TestPickSortTables(t *testing.T) { } func TestIteratePrefix(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { + testIteratorPrefix := func(t *testing.T, db *DB) { bkey := func(i int) []byte { return []byte(fmt.Sprintf("%04d", i)) } @@ -198,7 +198,38 @@ func TestIteratePrefix(t *testing.T) { for i := 0; i < n; i++ { require.Equal(t, 1, countOneKey(bkey(i))) } + } + + t.Run("With Default options", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + testIteratorPrefix(t, db) + }) }) + + t.Run("With Block Offsets in Cache", func(t *testing.T) { + opts := getTestOptions("") + opts = opts.WithKeepBlockIndicesInCache(true) + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + testIteratorPrefix(t, db) + }) + }) + + t.Run("With Block Offsets and Blocks in Cache", func(t *testing.T) { + opts := getTestOptions("") + opts = opts.WithKeepBlockIndicesInCache(true).WithKeepBlocksInCache(true) + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + testIteratorPrefix(t, db) + }) + }) + + t.Run("With Blocks in Cache", func(t *testing.T) { + opts := getTestOptions("") + opts = opts.WithKeepBlocksInCache(true) + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + testIteratorPrefix(t, db) + }) + }) + } // go test -v -run=XXX -bench=BenchmarkIterate -benchtime=3s diff --git a/options.go b/options.go index e97ba4770..fedb7cd98 100644 --- a/options.go +++ b/options.go @@ -93,6 +93,12 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode + // KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not. + KeepBlockIndicesInCache bool + + // KeepBlocksInCache decides whether to keep the sst blocks in the cache or not. + KeepBlocksInCache bool + // Transaction start and commit timestamps are managed by end-user. // This is only useful for databases built on top of Badger (like Dgraph). // Not recommended for most users. @@ -157,19 +163,23 @@ func DefaultOptions(path string) Options { LogRotatesToFlush: 2, EncryptionKey: []byte{}, EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days. + KeepBlocksInCache: false, + KeepBlockIndicesInCache: false, } } func buildTableOptions(opt Options) table.Options { return table.Options{ - TableSize: uint64(opt.MaxTableSize), - BlockSize: opt.BlockSize, - BloomFalsePositive: opt.BloomFalsePositive, - LoadBloomsOnOpen: opt.LoadBloomsOnOpen, - LoadingMode: opt.TableLoadingMode, - ChkMode: opt.ChecksumVerificationMode, - Compression: opt.Compression, - ZSTDCompressionLevel: opt.ZSTDCompressionLevel, + TableSize: uint64(opt.MaxTableSize), + BlockSize: opt.BlockSize, + BloomFalsePositive: opt.BloomFalsePositive, + LoadBloomsOnOpen: opt.LoadBloomsOnOpen, + LoadingMode: opt.TableLoadingMode, + ChkMode: opt.ChecksumVerificationMode, + Compression: opt.Compression, + ZSTDCompressionLevel: opt.ZSTDCompressionLevel, + KeepBlockIndicesInCache: opt.KeepBlockIndicesInCache, + KeepBlocksInCache: opt.KeepBlocksInCache, } } @@ -631,3 +641,38 @@ func (opt Options) WithLoadBloomsOnOpen(b bool) Options { opt.LoadBloomsOnOpen = b return opt } + +// WithKeepBlockIndicesInCache returns a new Option value with KeepBlockOffsetInCache set to the +// given value. +// +// When this option is set badger will store the block offsets in a cache along with the blocks. +// The size of the cache is determined by the MaxCacheSize option.If the MaxCacheSize is set to +// zero, then MaxCacheSize is set to 100 mb. When indices are stored in the cache, the read +// performance might be affected but the cache limits the amount of memory used by the indices. +// +// The default value of KeepBlockOffsetInCache is false. +func (opt Options) WithKeepBlockIndicesInCache(val bool) Options { + opt.KeepBlockIndicesInCache = val + + if val && opt.MaxCacheSize == 0 { + opt.MaxCacheSize = 100 << 20 + } + return opt +} + +// WithKeepBlocksInCache returns a new Option value with KeepBlocksInCache set to the +// given value. +// +// When this option is set badger will store the block in the cache. The size of the cache is +// determined by the MaxCacheSize option.If the MaxCacheSize is set to zero, +// then MaxCacheSize is set to 100 mb. +// +// The default value of KeepBlocksInCache is false. +func (opt Options) WithKeepBlocksInCache(val bool) Options { + opt.KeepBlocksInCache = val + + if val && opt.MaxCacheSize == 0 { + opt.MaxCacheSize = 100 << 20 + } + return opt +} diff --git a/table/builder_test.go b/table/builder_test.go index 5158a159a..dfa7f63fb 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -118,8 +118,8 @@ func TestTableIndex(t *testing.T) { } // Ensure index is built correctly - require.Equal(t, blockCount, len(tbl.blockIndex)) - for i, ko := range tbl.blockIndex { + require.Equal(t, blockCount, tbl.noOfBlocks) + for i, ko := range tbl.readTableIndex().Offsets { require.Equal(t, ko.Key, blockFirstKeys[i]) } f.Close() diff --git a/table/iterator.go b/table/iterator.go index c987862ea..d48e58138 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -195,7 +195,7 @@ func (itr *Iterator) Valid() bool { } func (itr *Iterator) seekToFirst() { - numBlocks := len(itr.t.blockIndex) + numBlocks := itr.t.noOfBlocks if numBlocks == 0 { itr.err = io.EOF return @@ -212,7 +212,7 @@ func (itr *Iterator) seekToFirst() { } func (itr *Iterator) seekToLast() { - numBlocks := len(itr.t.blockIndex) + numBlocks := itr.t.noOfBlocks if numBlocks == 0 { itr.err = io.EOF return @@ -249,8 +249,8 @@ func (itr *Iterator) seekFrom(key []byte, whence int) { case current: } - idx := sort.Search(len(itr.t.blockIndex), func(idx int) bool { - ko := itr.t.blockIndex[idx] + idx := sort.Search(itr.t.noOfBlocks, func(idx int) bool { + ko := itr.t.blockOffsets()[idx] return y.CompareKeys(ko.Key, key) > 0 }) if idx == 0 { @@ -269,7 +269,7 @@ func (itr *Iterator) seekFrom(key []byte, whence int) { itr.seekHelper(idx-1, key) if itr.err == io.EOF { // Case 1. Need to visit block[idx]. - if idx == len(itr.t.blockIndex) { + if idx == itr.t.noOfBlocks { // If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table. // There's nothing we can do. Valid() should return false as we seek to end of table. return @@ -297,7 +297,7 @@ func (itr *Iterator) seekForPrev(key []byte) { func (itr *Iterator) next() { itr.err = nil - if itr.bpos >= len(itr.t.blockIndex) { + if itr.bpos >= itr.t.noOfBlocks { itr.err = io.EOF return } diff --git a/table/table.go b/table/table.go index da7bd44c8..8b88a69a9 100644 --- a/table/table.go +++ b/table/table.go @@ -45,6 +45,13 @@ import ( const fileSuffix = ".sst" const intSize = int(unsafe.Sizeof(int(0))) +// 1 word = 8 bytes +// sizeOfOffsetStruct is the size of pb.BlockOffset +const sizeOfOffsetStruct int64 = 3*8 + // key array take 3 words + 1*8 + // offset and len takes 1 word + 3*8 + // XXX_unrecognized array takes 3 word. + 1*8 // so far 7 words, in order to round the slab we're adding one more word. + // Options contains configurable options for Table/Builder. type Options struct { // Options for Opening/Building Table. @@ -81,6 +88,12 @@ type Options struct { // When LoadBloomsOnOpen is set, bloom filters will be read only when they are accessed. // Otherwise they will be loaded on table open. LoadBloomsOnOpen bool + + // KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not. + KeepBlockIndicesInCache bool + + // KeepBlocksInCache decides whether to keep the block in the cache or not. + KeepBlocksInCache bool } // TableInterface is useful for testing. @@ -116,6 +129,8 @@ type Table struct { IsInmemory bool // Set to true if the table is on level 0 and opened in memory. opt *Options + + noOfBlocks int // Total number of blocks. } // CompressionType returns the compression algorithm used for block compression. @@ -159,7 +174,7 @@ func (t *Table) DecrRef() error { return err } // Delete all blocks from the cache. - for i := range t.blockIndex { + for i := 0; i < t.noOfBlocks; i++ { t.opt.Cache.Del(t.blockCacheKey(i)) } // Delete bloom filter from the cache. @@ -332,11 +347,13 @@ func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) { } func (t *Table) initBiggestAndSmallest() error { - if err := t.readIndex(); err != nil { + var err error + var ko *pb.BlockOffset + if ko, err = t.readIndex(); err != nil { return errors.Wrapf(err, "failed to read index.") } - t.smallest = t.blockIndex[0].Key + t.smallest = ko.Key it2 := t.NewIterator(true) defer it2.Close() @@ -383,7 +400,9 @@ func (t *Table) readNoFail(off, sz int) []byte { return res } -func (t *Table) readIndex() error { +// readIndex reads the index and populate the necessary table fields and returns +// first block offset +func (t *Table) readIndex() (*pb.BlockOffset, error) { readPos := t.tableSize // Read checksum len from the last 4 bytes. @@ -391,7 +410,7 @@ func (t *Table) readIndex() error { buf := t.readNoFail(readPos, 4) checksumLen := int(y.BytesToU32(buf)) if checksumLen < 0 { - return errors.New("checksum length less than zero. Data corrupted") + return nil, errors.New("checksum length less than zero. Data corrupted") } // Read checksum. @@ -399,7 +418,7 @@ func (t *Table) readIndex() error { readPos -= checksumLen buf = t.readNoFail(readPos, checksumLen) if err := proto.Unmarshal(buf, expectedChk); err != nil { - return err + return nil, err } // Read index size from the footer. @@ -413,7 +432,7 @@ func (t *Table) readIndex() error { data := t.readNoFail(readPos, t.indexLen) if err := y.VerifyChecksum(data, expectedChk); err != nil { - return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) + return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) } index := pb.TableIndex{} @@ -421,7 +440,7 @@ func (t *Table) readIndex() error { if t.shouldDecrypt() { var err error if data, err = t.decrypt(data); err != nil { - return y.Wrapf(err, + return nil, y.Wrapf(err, "Error while decrypting table index for the table %d in Table.readIndex", t.id) } } @@ -429,7 +448,7 @@ func (t *Table) readIndex() error { y.Check(err) t.estimatedSize = index.EstimatedSize - t.blockIndex = index.Offsets + t.noOfBlocks = len(index.Offsets) if t.opt.LoadBloomsOnOpen { t.bfLock.Lock() @@ -437,7 +456,47 @@ func (t *Table) readIndex() error { t.bfLock.Unlock() } - return nil + if t.opt.KeepBlockIndicesInCache && t.opt.Cache != nil { + t.opt.Cache.Set( + t.blockOffsetsCacheKey(), + index.Offsets, + calculateOffsetsSize(index.Offsets)) + + return index.Offsets[0], nil + } + + t.blockIndex = index.Offsets + return index.Offsets[0], nil +} + +// blockOffsets returns block offsets of this table. +func (t *Table) blockOffsets() []*pb.BlockOffset { + if !t.opt.KeepBlockIndicesInCache || t.opt.Cache == nil { + return t.blockIndex + } + + if val, ok := t.opt.Cache.Get(t.blockOffsetsCacheKey()); ok && val != nil { + return val.([]*pb.BlockOffset) + } + + ti := t.readTableIndex() + + t.opt.Cache.Set(t.blockOffsetsCacheKey(), ti.Offsets, calculateOffsetsSize(ti.Offsets)) + return ti.Offsets +} + +// calculateOffsetsSize returns the size of *pb.BlockOffset array +func calculateOffsetsSize(offsets []*pb.BlockOffset) int64 { + totalSize := sizeOfOffsetStruct * int64(len(offsets)) + + for _, ko := range offsets { + // add key size. + totalSize += int64(cap(ko.Key)) + // add XXX_unrecognized size. + totalSize += int64(cap(ko.XXX_unrecognized)) + } + // Add three words for array size. + return totalSize + 3*8 } // block function return a new block. Each block holds a ref and the byte @@ -445,10 +504,10 @@ func (t *Table) readIndex() error { // caller should release the block by calling block.decrRef() on it. func (t *Table) block(idx int) (*block, error) { y.AssertTruef(idx >= 0, "idx=%d", idx) - if idx >= len(t.blockIndex) { + if idx >= t.noOfBlocks { return nil, errors.New("block out of index") } - if t.opt.Cache != nil { + if t.opt.Cache != nil && t.opt.KeepBlocksInCache { key := t.blockCacheKey(idx) blk, ok := t.opt.Cache.Get(key) if ok && blk != nil { @@ -460,7 +519,9 @@ func (t *Table) block(idx int) (*block, error) { } } } - ko := t.blockIndex[idx] + + // Read the block index if it's nil + ko := t.blockOffsets()[idx] blk := &block{ offset: int(ko.Offset), ref: 1, @@ -518,7 +579,7 @@ func (t *Table) block(idx int) (*block, error) { return nil, err } } - if t.opt.Cache != nil { + if t.opt.Cache != nil && t.opt.KeepBlocksInCache { key := t.blockCacheKey(idx) // incrRef should never return false here because we're calling it on a // new block with ref=1. @@ -553,6 +614,15 @@ func (t *Table) blockCacheKey(idx int) []byte { return buf } +// blockOffsetsCacheKey returns the cache key for block offsets. +func (t *Table) blockOffsetsCacheKey() []byte { + y.AssertTrue(t.id < math.MaxUint32) + buf := make([]byte, 4, 6) + binary.BigEndian.PutUint32(buf, uint32(t.id)) + + return append([]byte("bo"), buf...) +} + // EstimatedSize returns the total size of key-values stored in this table (including the // disk space occupied on the value log). func (t *Table) EstimatedSize() uint64 { return t.estimatedSize } @@ -604,6 +674,14 @@ func (t *Table) DoesNotHave(hash uint64) bool { // along with the bloom filter. func (t *Table) readBloomFilter() (*z.Bloom, int) { // Read bloom filter from the SST. + index := t.readTableIndex() + bf, err := z.JSONUnmarshal(index.BloomFilter) + y.Check(err) + return bf, len(index.BloomFilter) +} + +// readTableIndex reads table index from the sst and returns its pb format. +func (t *Table) readTableIndex() *pb.TableIndex { data := t.readNoFail(t.indexStart, t.indexLen) index := pb.TableIndex{} var err error @@ -613,16 +691,13 @@ func (t *Table) readBloomFilter() (*z.Bloom, int) { y.Check(err) } y.Check(proto.Unmarshal(data, &index)) - - bf, err := z.JSONUnmarshal(index.BloomFilter) - y.Check(err) - return bf, len(index.BloomFilter) + return &index } // VerifyChecksum verifies checksum for all blocks of table. This function is called by // OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). func (t *Table) VerifyChecksum() error { - for i, os := range t.blockIndex { + for i, os := range t.blockOffsets() { b, err := t.block(i) if err != nil { return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d", diff --git a/table/table_test.go b/table/table_test.go index c85891dc1..a39d88e5a 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -31,6 +31,7 @@ import ( "github.com/cespare/xxhash" "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/ristretto" "github.com/stretchr/testify/require" @@ -952,3 +953,22 @@ func TestDoesNotHaveRace(t *testing.T) { } wg.Wait() } + +var ko *pb.BlockOffset + +// Use this benchmark to manually verify block offset size calculation +func BenchmarkBlockOffsetSizeCalculation(b *testing.B) { + for i := 0; i < b.N; i++ { + ko = &pb.BlockOffset{ + Key: []byte{1, 23}, + } + } +} + +func TestBlockOffsetSizeCalculation(t *testing.T) { + // Empty struct testing. + require.Equal(t, calculateOffsetsSize([]*pb.BlockOffset{&pb.BlockOffset{}}), int64(88)) + // Testing with key bytes + require.Equal(t, calculateOffsetsSize([]*pb.BlockOffset{&pb.BlockOffset{Key: []byte{1, 1}}}), + int64(90)) +} diff --git a/test.sh b/test.sh index 05c28ae88..06c9396c5 100755 --- a/test.sh +++ b/test.sh @@ -38,9 +38,9 @@ go test -v -race $packages echo echo "==> Starting tests with value log mmapped..." # Run top level package tests with mmap flag. -go test -timeout=15m -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=true +go test -timeout=25m -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=true echo echo "==> Starting tests with value log not mmapped..." -go test -timeout=15m -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=false +go test -timeout=25m -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=false