From c63cb2ce526c8133fba2f2ed2272a5d57d694c4a Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 21 Oct 2019 16:19:09 +0530 Subject: [PATCH] Introduce block cache in badger (#1066) This commit adds ristretto to badger. All the reads are done through the block cache. The block cache holds blocks in a decompressed and unencrypted state. The memory usage of cache can be changed by `MaxCacheSize` option. The default size of the cache is 1 GB. --- badger/backup_test.go | 8 ++++---- badger/db.go | 31 +++++++++++++++++++++++++++++-- badger/db_test.go | 5 ++++- badger/go.mod | 2 +- badger/go.sum | 2 ++ badger/levels.go | 3 +++ badger/managed_db_test.go | 2 +- badger/options.go | 19 ++++++++++++++++--- badger/stream_writer.go | 1 + badger/stream_writer_test.go | 12 ++++++------ badger/table/table.go | 31 +++++++++++++++++++++++++++++-- badger/table/table_test.go | 32 +++++++++++++++++++++++++++----- 12 files changed, 123 insertions(+), 25 deletions(-) diff --git a/badger/backup_test.go b/badger/backup_test.go index d2fd83972..fc8d2b8db 100644 --- a/badger/backup_test.go +++ b/badger/backup_test.go @@ -125,7 +125,7 @@ func TestBackupRestore2(t *testing.T) { s2Path := filepath.Join(tmpdir, "test2") s3Path := filepath.Join(tmpdir, "test3") - db1, err := Open(DefaultOptions(s1Path)) + db1, err := Open(getTestOptions(s1Path)) if err != nil { t.Fatal(err) } @@ -160,7 +160,7 @@ func TestBackupRestore2(t *testing.T) { } fmt.Println("backup1 length:", backup.Len()) - db2, err := Open(DefaultOptions(s2Path)) + db2, err := Open(getTestOptions(s2Path)) if err != nil { t.Fatal(err) } @@ -211,7 +211,7 @@ func TestBackupRestore2(t *testing.T) { t.Fatal(err) } fmt.Println("backup2 length:", backup.Len()) - db3, err := Open(DefaultOptions(s3Path)) + db3, err := Open(getTestOptions(s3Path)) if err != nil { t.Fatal(err) } @@ -454,7 +454,7 @@ func TestBackupLoadIncremental(t *testing.T) { require.True(t, bb.Len() > 0) // restore - db2, err := Open(DefaultOptions(filepath.Join(tmpdir, "restore2"))) + db2, err := Open(getTestOptions(filepath.Join(tmpdir, "restore2"))) if err != nil { t.Fatal(err) } diff --git a/badger/db.go b/badger/db.go index 30a3725fc..016e10826 100644 --- a/badger/db.go +++ b/badger/db.go @@ -35,6 +35,7 @@ import ( "github.com/dgraph-io/badger/skl" "github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/y" + "github.com/dgraph-io/ristretto" humanize "github.com/dustin/go-humanize" "github.com/pkg/errors" "golang.org/x/net/trace" @@ -88,8 +89,9 @@ type DB struct { orc *oracle - pub *publisher - registry *KeyRegistry + pub *publisher + registry *KeyRegistry + blockCache *ristretto.Cache } const ( @@ -277,6 +279,18 @@ func Open(opt Options) (db *DB, err error) { elog = trace.NewEventLog("Badger", "DB") } + config := ristretto.Config{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), + MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), + BufferItems: 64, + // Enable metrics once https://github.com/dgraph-io/ristretto/issues/92 is resolved. + Metrics: false, + } + cache, err := ristretto.NewCache(&config) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache") + } db = &DB{ imm: make([]*skl.Skiplist, 0, opt.NumMemtables), flushChan: make(chan flushTask, opt.NumMemtables), @@ -288,6 +302,7 @@ func Open(opt Options) (db *DB, err error) { valueDirGuard: valueDirLockGuard, orc: newOracle(opt), pub: newPublisher(), + blockCache: cache, } krOpt := KeyRegistryOptions{ @@ -367,6 +382,14 @@ func Open(opt Options) (db *DB, err error) { return db, nil } +// CacheMetrics returns the metrics for the underlying cache. +func (db *DB) CacheMetrics() *ristretto.Metrics { + return nil + // Do not enable ristretto metrics in badger until issue + // https://github.com/dgraph-io/ristretto/issues/92 is resolved. + // return db.blockCache.Metrics() +} + // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to // disk. Calling DB.Close() multiple times would still only close the DB once. func (db *DB) Close() error { @@ -453,6 +476,7 @@ func (db *DB) close() (err error) { db.elog.Printf("Waiting for closer") db.closers.updateSize.SignalAndWait() db.orc.Stop() + db.blockCache.Close() db.elog.Finish() @@ -909,6 +933,8 @@ func (db *DB) handleFlushTask(ft flushTask) error { } bopts := buildTableOptions(db.opt) bopts.DataKey = dk + // Builder does not need cache but the same options are used for opening table. + bopts.Cache = db.blockCache tableData := buildL0Table(ft, bopts) fileID := db.lc.reserveFileID() @@ -1447,6 +1473,7 @@ func (db *DB) dropAll() (func(), error) { db.vhead = valuePointer{} // Zero it out. db.lc.nextFileID = 1 db.opt.Infof("Deleted %d value log files. DropAll done.\n", num) + db.blockCache.Clear() return resume, nil } diff --git a/badger/db_test.go b/badger/db_test.go index 1d88a7e3b..01cb092ff 100644 --- a/badger/db_test.go +++ b/badger/db_test.go @@ -74,7 +74,8 @@ func getTestOptions(dir string) Options { opt := DefaultOptions(dir). WithMaxTableSize(1 << 15). // Force more compaction. WithLevelOneSize(4 << 15). // Force more compaction. - WithSyncWrites(false) + WithSyncWrites(false). + WithMaxCacheSize(10 << 20) if !*mmap { return opt.WithValueLogLoadingMode(options.FileIO) } @@ -1571,6 +1572,7 @@ func TestMinReadTs(t *testing.T) { } func TestGoroutineLeak(t *testing.T) { + time.Sleep(1 * time.Second) before := runtime.NumGoroutine() t.Logf("Num go: %d", before) for i := 0; i < 12; i++ { @@ -1602,6 +1604,7 @@ func TestGoroutineLeak(t *testing.T) { require.Equal(t, true, updated) }) } + time.Sleep(2 * time.Second) require.Equal(t, before, runtime.NumGoroutine()) } diff --git a/badger/go.mod b/badger/go.mod index c52fb3ace..5b9b05c72 100644 --- a/badger/go.mod +++ b/badger/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.1.0 // indirect - github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e + github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/badger/go.sum b/badger/go.sum index 4a485aafa..5ce15406f 100644 --- a/badger/go.sum +++ b/badger/go.sum @@ -24,6 +24,8 @@ github.com/dgraph-io/ristretto v0.0.0-20190903064322-eb48d2f7ca30 h1:FkdGlqxPjfH github.com/dgraph-io/ristretto v0.0.0-20190903064322-eb48d2f7ca30/go.mod h1:UvZmzj8odp3S1nli6yEb1vLME8iJFBrRcw8rAJEiu9Q= github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e h1:6ryPDbNhRiZ7Hbh8G3IxUaw3utIzI2FZEP5FBl8x8n0= github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e/go.mod h1:3dYWm7+Szhwp+wbqGQ5w8o8H1eBkPJEjIEAR5fXCCfs= +github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534 h1:9G6fVccQriMJu4nXwpwLDoy9y31t/KUSLAbPcoBgv+4= +github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/badger/levels.go b/badger/levels.go index 8b278416f..1ef9da7dc 100644 --- a/badger/levels.go +++ b/badger/levels.go @@ -158,6 +158,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { // Set compression from table manifest. topt.Compression = tf.Compression topt.DataKey = dk + topt.Cache = db.blockCache t, err := table.OpenTable(fd, topt) if err != nil { if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") { @@ -509,6 +510,8 @@ func (s *levelsController) compactBuildTables( } bopts := buildTableOptions(s.kv.opt) bopts.DataKey = dk + // Builder does not need cache but the same options are used for opening table. + bopts.Cache = s.kv.blockCache builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 for ; it.Valid(); it.Next() { diff --git a/badger/managed_db_test.go b/badger/managed_db_test.go index 93e3a1de3..3ef8f7b1a 100644 --- a/badger/managed_db_test.go +++ b/badger/managed_db_test.go @@ -87,7 +87,7 @@ func TestDropAllManaged(t *testing.T) { require.NoError(t, db.DropAll()) // Just call it twice, for fun. require.Equal(t, 0, numKeysManaged(db, math.MaxUint64)) - // Check that we can still write to mdb, and using lower timestamps. + // Check that we can still write to db, and using lower timestamps. populate(db, 1) require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64)) require.NoError(t, db.Close()) diff --git a/badger/options.go b/badger/options.go index fd14b0279..c88e298bc 100644 --- a/badger/options.go +++ b/badger/options.go @@ -57,9 +57,12 @@ type Options struct { MaxLevels int ValueThreshold int NumMemtables int - BlockSize int - BloomFalsePositive float64 - KeepL0InMemory bool + // Changing BlockSize across DB runs will not break badger. The block size is + // read from the block index stored at the end of the table. + BlockSize int + BloomFalsePositive float64 + KeepL0InMemory bool + MaxCacheSize int64 NumLevelZeroTables int NumLevelZeroTablesStall int @@ -118,6 +121,7 @@ func DefaultOptions(path string) Options { KeepL0InMemory: true, VerifyValueChecksum: false, Compression: options.ZSTD, + MaxCacheSize: 1 << 30, // 1 GB // Nothing to read/write value log using standard File I/O // MemoryMap to mmap() the value log files // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32. @@ -499,3 +503,12 @@ func (opt Options) WithVerifyValueChecksum(val bool) Options { opt.VerifyValueChecksum = val return opt } + +// WithMaxCacheSize returns a new Options value with MaxCacheSize set to the given value. +// +// This value specifies how much data cache should hold in memory. A small size of cache means lower +// memory consumption and lookups/iterations would take longer. +func (opt Options) WithMaxCacheSize(size int64) Options { + opt.MaxCacheSize = size + return opt +} diff --git a/badger/stream_writer.go b/badger/stream_writer.go index b59eb9bb3..7bea6568a 100644 --- a/badger/stream_writer.go +++ b/badger/stream_writer.go @@ -410,6 +410,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } opts := buildTableOptions(w.db.opt) opts.DataKey = builder.DataKey() + opts.Cache = w.db.blockCache tbl, err := table.OpenTable(fd, opts) if err != nil { return err diff --git a/badger/stream_writer_test.go b/badger/stream_writer_test.go index 332d42387..7b7300a20 100644 --- a/badger/stream_writer_test.go +++ b/badger/stream_writer_test.go @@ -51,8 +51,8 @@ func getSortedKVList(valueSize, listSize int) *pb.KVList { // check if we can read values after writing using stream writer func TestStreamWriter1(t *testing.T) { - normalModeOpts := DefaultOptions("") - managedModeOpts := DefaultOptions("") + normalModeOpts := getTestOptions("") + managedModeOpts := getTestOptions("") managedModeOpts.managedTxns = true for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} { @@ -92,8 +92,8 @@ func TestStreamWriter1(t *testing.T) { // write more keys to db after writing keys using stream writer func TestStreamWriter2(t *testing.T) { - normalModeOpts := DefaultOptions("") - managedModeOpts := DefaultOptions("") + normalModeOpts := getTestOptions("") + managedModeOpts := getTestOptions("") managedModeOpts.managedTxns = true for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} { @@ -144,8 +144,8 @@ func TestStreamWriter2(t *testing.T) { } func TestStreamWriter3(t *testing.T) { - normalModeOpts := DefaultOptions("") - managedModeOpts := DefaultOptions("") + normalModeOpts := getTestOptions("") + managedModeOpts := getTestOptions("") managedModeOpts.managedTxns = true for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} { diff --git a/badger/table/table.go b/badger/table/table.go index 7c0b89118..bb1861905 100644 --- a/badger/table/table.go +++ b/badger/table/table.go @@ -20,6 +20,7 @@ import ( "crypto/aes" "fmt" "io" + "math" "os" "path" "path/filepath" @@ -27,6 +28,7 @@ import ( "strings" "sync" "sync/atomic" + "unsafe" "github.com/DataDog/zstd" "github.com/golang/protobuf/proto" @@ -36,10 +38,12 @@ import ( "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" + "github.com/dgraph-io/ristretto" "github.com/dgraph-io/ristretto/z" ) const fileSuffix = ".sst" +const intSize = int(unsafe.Sizeof(int(0))) // Options contains configurable options for Table/Builder. type Options struct { @@ -64,6 +68,8 @@ type Options struct { // Compression indicates the compression algorithm used for block compression. Compression options.CompressionType + + Cache *ristretto.Cache } // TableInterface is useful for testing. @@ -149,6 +155,11 @@ type block struct { chkLen int // checksum length } +func (b *block) size() int64 { + return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ + + cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4) +} + func (b block) verifyCheckSum() error { cs := &pb.Checksum{} if err := proto.Unmarshal(b.checksum, cs); err != nil { @@ -348,7 +359,13 @@ func (t *Table) block(idx int) (*block, error) { if idx >= len(t.blockIndex) { return nil, errors.New("block out of index") } - + if t.opt.Cache != nil { + key := t.blockCacheKey(idx) + blk, ok := t.opt.Cache.Get(key) + if ok && blk != nil { + return blk.(*block), nil + } + } ko := t.blockIndex[idx] blk := &block{ offset: int(ko.Offset), @@ -407,7 +424,17 @@ func (t *Table) block(idx int) (*block, error) { return nil, err } } - return blk, err + if t.opt.Cache != nil { + key := t.blockCacheKey(idx) + t.opt.Cache.Set(key, blk, blk.size()) + } + return blk, nil +} + +func (t *Table) blockCacheKey(idx int) uint64 { + y.AssertTrue(t.ID() < math.MaxUint32) + y.AssertTrue(idx < math.MaxUint32) + return (t.ID() << 32) | uint64(idx) } // Size is its file size in bytes diff --git a/badger/table/table_test.go b/badger/table/table_test.go index cbb120fa6..8c8de96b4 100644 --- a/badger/table/table_test.go +++ b/badger/table/table_test.go @@ -31,6 +31,7 @@ import ( "github.com/cespare/xxhash" "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/y" + "github.com/dgraph-io/ristretto" "github.com/stretchr/testify/require" ) @@ -47,7 +48,6 @@ func getTestTableOptions() Options { return Options{ Compression: options.ZSTD, LoadingMode: options.LoadToRAM, - ChkMode: options.OnTableAndBlockRead, BlockSize: 4 * 1024, BloomFalsePositive: 0.01, } @@ -738,6 +738,7 @@ func TestTableChecksum(t *testing.T) { rb := make([]byte, 100) rand.Read(rb) opts := getTestTableOptions() + opts.ChkMode = options.OnTableAndBlockRead f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") @@ -749,9 +750,16 @@ func TestTableChecksum(t *testing.T) { } } +var cacheConfig = ristretto.Config{ + NumCounters: 1000000 * 10, + MaxCost: 1000000, + BufferItems: 64, + Metrics: true, +} + func BenchmarkRead(b *testing.B) { n := int(5 * 1e6) - tbl := getTableForBenchmarks(b, n) + tbl := getTableForBenchmarks(b, n, nil) defer tbl.DecrRef() b.ResetTimer() @@ -768,7 +776,9 @@ func BenchmarkRead(b *testing.B) { func BenchmarkReadAndBuild(b *testing.B) { n := int(5 * 1e6) - tbl := getTableForBenchmarks(b, n) + + var cache, _ = ristretto.NewCache(&cacheConfig) + tbl := getTableForBenchmarks(b, n, cache) defer tbl.DecrRef() b.ResetTimer() @@ -776,6 +786,7 @@ func BenchmarkReadAndBuild(b *testing.B) { for i := 0; i < b.N; i++ { func() { opts := Options{Compression: options.ZSTD, BlockSize: 4 * 0124, BloomFalsePositive: 0.01} + opts.Cache = cache newBuilder := NewTableBuilder(opts) it := tbl.NewIterator(false) defer it.Close() @@ -794,9 +805,14 @@ func BenchmarkReadMerged(b *testing.B) { y.AssertTrue((n % m) == 0) tableSize := n / m var tables []*Table + + var cache, err = ristretto.NewCache(&cacheConfig) + require.NoError(b, err) + for i := 0; i < m; i++ { filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts.Cache = cache builder := NewTableBuilder(opts) f, err := y.OpenSyncedFile(filename, true) y.Check(err) @@ -855,7 +871,7 @@ func BenchmarkChecksum(b *testing.B) { func BenchmarkRandomRead(b *testing.B) { n := int(5 * 1e6) - tbl := getTableForBenchmarks(b, n) + tbl := getTableForBenchmarks(b, n, nil) defer tbl.DecrRef() r := rand.New(rand.NewSource(time.Now().Unix())) @@ -880,9 +896,15 @@ func BenchmarkRandomRead(b *testing.B) { } } -func getTableForBenchmarks(b *testing.B, count int) *Table { +func getTableForBenchmarks(b *testing.B, count int, cache *ristretto.Cache) *Table { rand.Seed(time.Now().Unix()) opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + if cache == nil { + var err error + cache, err = ristretto.NewCache(&cacheConfig) + require.NoError(b, err) + } + opts.Cache = cache builder := NewTableBuilder(opts) filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) f, err := y.OpenSyncedFile(filename, true)