Skip to content
This repository has been archived by the owner on Dec 13, 2022. It is now read-only.

Commit

Permalink
Introduce block cache in badger (#1066)
Browse files Browse the repository at this point in the history
This commit adds ristretto to badger. All the reads are done through
the block cache. The block cache holds blocks in a decompressed and
unencrypted state. The memory usage of cache can be changed by
`MaxCacheSize` option. The default size of the cache is 1 GB.
  • Loading branch information
Ibrahim Jarif authored Oct 21, 2019
1 parent 116a95e commit c63cb2c
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 25 deletions.
8 changes: 4 additions & 4 deletions badger/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestBackupRestore2(t *testing.T) {
s2Path := filepath.Join(tmpdir, "test2")
s3Path := filepath.Join(tmpdir, "test3")

db1, err := Open(DefaultOptions(s1Path))
db1, err := Open(getTestOptions(s1Path))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestBackupRestore2(t *testing.T) {
}
fmt.Println("backup1 length:", backup.Len())

db2, err := Open(DefaultOptions(s2Path))
db2, err := Open(getTestOptions(s2Path))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestBackupRestore2(t *testing.T) {
t.Fatal(err)
}
fmt.Println("backup2 length:", backup.Len())
db3, err := Open(DefaultOptions(s3Path))
db3, err := Open(getTestOptions(s3Path))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestBackupLoadIncremental(t *testing.T) {
require.True(t, bb.Len() > 0)

// restore
db2, err := Open(DefaultOptions(filepath.Join(tmpdir, "restore2")))
db2, err := Open(getTestOptions(filepath.Join(tmpdir, "restore2")))
if err != nil {
t.Fatal(err)
}
Expand Down
31 changes: 29 additions & 2 deletions badger/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/badger/skl"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/ristretto"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"golang.org/x/net/trace"
Expand Down Expand Up @@ -88,8 +89,9 @@ type DB struct {

orc *oracle

pub *publisher
registry *KeyRegistry
pub *publisher
registry *KeyRegistry
blockCache *ristretto.Cache
}

const (
Expand Down Expand Up @@ -277,6 +279,18 @@ func Open(opt Options) (db *DB, err error) {
elog = trace.NewEventLog("Badger", "DB")
}

config := ristretto.Config{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2),
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
// Enable metrics once https://github.com/dgraph-io/ristretto/issues/92 is resolved.
Metrics: false,
}
cache, err := ristretto.NewCache(&config)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache")
}
db = &DB{
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
Expand All @@ -288,6 +302,7 @@ func Open(opt Options) (db *DB, err error) {
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
blockCache: cache,
}

krOpt := KeyRegistryOptions{
Expand Down Expand Up @@ -367,6 +382,14 @@ func Open(opt Options) (db *DB, err error) {
return db, nil
}

// CacheMetrics returns the metrics for the underlying cache.
func (db *DB) CacheMetrics() *ristretto.Metrics {
return nil
// Do not enable ristretto metrics in badger until issue
// https://github.com/dgraph-io/ristretto/issues/92 is resolved.
// return db.blockCache.Metrics()
}

// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
// disk. Calling DB.Close() multiple times would still only close the DB once.
func (db *DB) Close() error {
Expand Down Expand Up @@ -453,6 +476,7 @@ func (db *DB) close() (err error) {
db.elog.Printf("Waiting for closer")
db.closers.updateSize.SignalAndWait()
db.orc.Stop()
db.blockCache.Close()

db.elog.Finish()

Expand Down Expand Up @@ -909,6 +933,8 @@ func (db *DB) handleFlushTask(ft flushTask) error {
}
bopts := buildTableOptions(db.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = db.blockCache
tableData := buildL0Table(ft, bopts)

fileID := db.lc.reserveFileID()
Expand Down Expand Up @@ -1447,6 +1473,7 @@ func (db *DB) dropAll() (func(), error) {
db.vhead = valuePointer{} // Zero it out.
db.lc.nextFileID = 1
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
db.blockCache.Clear()
return resume, nil
}

Expand Down
5 changes: 4 additions & 1 deletion badger/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func getTestOptions(dir string) Options {
opt := DefaultOptions(dir).
WithMaxTableSize(1 << 15). // Force more compaction.
WithLevelOneSize(4 << 15). // Force more compaction.
WithSyncWrites(false)
WithSyncWrites(false).
WithMaxCacheSize(10 << 20)
if !*mmap {
return opt.WithValueLogLoadingMode(options.FileIO)
}
Expand Down Expand Up @@ -1571,6 +1572,7 @@ func TestMinReadTs(t *testing.T) {
}

func TestGoroutineLeak(t *testing.T) {
time.Sleep(1 * time.Second)
before := runtime.NumGoroutine()
t.Logf("Num go: %d", before)
for i := 0; i < 12; i++ {
Expand Down Expand Up @@ -1602,6 +1604,7 @@ func TestGoroutineLeak(t *testing.T) {
require.Equal(t, true, updated)
})
}
time.Sleep(2 * time.Second)
require.Equal(t, before, runtime.NumGoroutine())
}

Expand Down
2 changes: 1 addition & 1 deletion badger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.0 // indirect
github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e
github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions badger/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ github.com/dgraph-io/ristretto v0.0.0-20190903064322-eb48d2f7ca30 h1:FkdGlqxPjfH
github.com/dgraph-io/ristretto v0.0.0-20190903064322-eb48d2f7ca30/go.mod h1:UvZmzj8odp3S1nli6yEb1vLME8iJFBrRcw8rAJEiu9Q=
github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e h1:6ryPDbNhRiZ7Hbh8G3IxUaw3utIzI2FZEP5FBl8x8n0=
github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e/go.mod h1:3dYWm7+Szhwp+wbqGQ5w8o8H1eBkPJEjIEAR5fXCCfs=
github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534 h1:9G6fVccQriMJu4nXwpwLDoy9y31t/KUSLAbPcoBgv+4=
github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
3 changes: 3 additions & 0 deletions badger/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
// Set compression from table manifest.
topt.Compression = tf.Compression
topt.DataKey = dk
topt.Cache = db.blockCache
t, err := table.OpenTable(fd, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
Expand Down Expand Up @@ -509,6 +510,8 @@ func (s *levelsController) compactBuildTables(
}
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
Expand Down
2 changes: 1 addition & 1 deletion badger/managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestDropAllManaged(t *testing.T) {
require.NoError(t, db.DropAll()) // Just call it twice, for fun.
require.Equal(t, 0, numKeysManaged(db, math.MaxUint64))

// Check that we can still write to mdb, and using lower timestamps.
// Check that we can still write to db, and using lower timestamps.
populate(db, 1)
require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64))
require.NoError(t, db.Close())
Expand Down
19 changes: 16 additions & 3 deletions badger/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ type Options struct {
MaxLevels int
ValueThreshold int
NumMemtables int
BlockSize int
BloomFalsePositive float64
KeepL0InMemory bool
// Changing BlockSize across DB runs will not break badger. The block size is
// read from the block index stored at the end of the table.
BlockSize int
BloomFalsePositive float64
KeepL0InMemory bool
MaxCacheSize int64

NumLevelZeroTables int
NumLevelZeroTablesStall int
Expand Down Expand Up @@ -118,6 +121,7 @@ func DefaultOptions(path string) Options {
KeepL0InMemory: true,
VerifyValueChecksum: false,
Compression: options.ZSTD,
MaxCacheSize: 1 << 30, // 1 GB
// Nothing to read/write value log using standard File I/O
// MemoryMap to mmap() the value log files
// (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32.
Expand Down Expand Up @@ -499,3 +503,12 @@ func (opt Options) WithVerifyValueChecksum(val bool) Options {
opt.VerifyValueChecksum = val
return opt
}

// WithMaxCacheSize returns a new Options value with MaxCacheSize set to the given value.
//
// This value specifies how much data cache should hold in memory. A small size of cache means lower
// memory consumption and lookups/iterations would take longer.
func (opt Options) WithMaxCacheSize(size int64) Options {
opt.MaxCacheSize = size
return opt
}
1 change: 1 addition & 0 deletions badger/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
}
opts := buildTableOptions(w.db.opt)
opts.DataKey = builder.DataKey()
opts.Cache = w.db.blockCache
tbl, err := table.OpenTable(fd, opts)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions badger/stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func getSortedKVList(valueSize, listSize int) *pb.KVList {

// check if we can read values after writing using stream writer
func TestStreamWriter1(t *testing.T) {
normalModeOpts := DefaultOptions("")
managedModeOpts := DefaultOptions("")
normalModeOpts := getTestOptions("")
managedModeOpts := getTestOptions("")
managedModeOpts.managedTxns = true

for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} {
Expand Down Expand Up @@ -92,8 +92,8 @@ func TestStreamWriter1(t *testing.T) {

// write more keys to db after writing keys using stream writer
func TestStreamWriter2(t *testing.T) {
normalModeOpts := DefaultOptions("")
managedModeOpts := DefaultOptions("")
normalModeOpts := getTestOptions("")
managedModeOpts := getTestOptions("")
managedModeOpts.managedTxns = true

for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} {
Expand Down Expand Up @@ -144,8 +144,8 @@ func TestStreamWriter2(t *testing.T) {
}

func TestStreamWriter3(t *testing.T) {
normalModeOpts := DefaultOptions("")
managedModeOpts := DefaultOptions("")
normalModeOpts := getTestOptions("")
managedModeOpts := getTestOptions("")
managedModeOpts.managedTxns = true

for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} {
Expand Down
31 changes: 29 additions & 2 deletions badger/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"crypto/aes"
"fmt"
"io"
"math"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"unsafe"

"github.com/DataDog/zstd"
"github.com/golang/protobuf/proto"
Expand All @@ -36,10 +38,12 @@ import (
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
)

const fileSuffix = ".sst"
const intSize = int(unsafe.Sizeof(int(0)))

// Options contains configurable options for Table/Builder.
type Options struct {
Expand All @@ -64,6 +68,8 @@ type Options struct {

// Compression indicates the compression algorithm used for block compression.
Compression options.CompressionType

Cache *ristretto.Cache
}

// TableInterface is useful for testing.
Expand Down Expand Up @@ -149,6 +155,11 @@ type block struct {
chkLen int // checksum length
}

func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
}

func (b block) verifyCheckSum() error {
cs := &pb.Checksum{}
if err := proto.Unmarshal(b.checksum, cs); err != nil {
Expand Down Expand Up @@ -348,7 +359,13 @@ func (t *Table) block(idx int) (*block, error) {
if idx >= len(t.blockIndex) {
return nil, errors.New("block out of index")
}

if t.opt.Cache != nil {
key := t.blockCacheKey(idx)
blk, ok := t.opt.Cache.Get(key)
if ok && blk != nil {
return blk.(*block), nil
}
}
ko := t.blockIndex[idx]
blk := &block{
offset: int(ko.Offset),
Expand Down Expand Up @@ -407,7 +424,17 @@ func (t *Table) block(idx int) (*block, error) {
return nil, err
}
}
return blk, err
if t.opt.Cache != nil {
key := t.blockCacheKey(idx)
t.opt.Cache.Set(key, blk, blk.size())
}
return blk, nil
}

func (t *Table) blockCacheKey(idx int) uint64 {
y.AssertTrue(t.ID() < math.MaxUint32)
y.AssertTrue(idx < math.MaxUint32)
return (t.ID() << 32) | uint64(idx)
}

// Size is its file size in bytes
Expand Down
Loading

0 comments on commit c63cb2c

Please sign in to comment.