Skip to content

Commit

Permalink
Improvements: Manual Memory allocation via Calloc (#1459)
Browse files Browse the repository at this point in the history
This PR contains a  bunch of improvements
1. This PR reverts the following reverts (adds them back)
    - Revert "Compress/Encrypt Blocks in the background  6001230
    - Revert "Buffer pool for decompression 800305e
   - Revert "fix: Fix race condition in block.incRef 63d9309
    - Revert "add assert to check integer overflow for table size e0d058c
2. Calloc and Free for memory allocation
    - The block buffers for decompression and encryption are now allocated using calloc and free.
      The `y/calloc.go` file contains the code.

The compaction changes are moved to a separate PR for easier cherry-picking
#1466
  • Loading branch information
Ibrahim Jarif authored Aug 19, 2020
1 parent 0b8eb4c commit 5c1b4c8
Show file tree
Hide file tree
Showing 25 changed files with 758 additions and 204 deletions.
29 changes: 26 additions & 3 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte {
}

func getBalance(txn *badger.Txn, account int) (uint64, error) {
item, err := txn.Get(key(account))
item, err := get(txn, key(account))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -197,14 +197,33 @@ func diff(a, b []account) string {

var errFailure = errors.New("test failed due to balance mismatch")

// get function will fetch the value for the key "k" either by using the
// txn.Get API or the iterator.Seek API.
func get(txn *badger.Txn, k []byte) (*badger.Item, error) {
if rand.Int()%2 == 0 {
return txn.Get(k)
}

iopt := badger.DefaultIteratorOptions
// PrefectValues is expensive. We don't need it here.
iopt.PrefetchValues = false
it := txn.NewIterator(iopt)
defer it.Close()
it.Seek(k)
if it.Valid() {
return it.Item(), nil
}
return nil, badger.ErrKeyNotFound
}

// seekTotal retrives the total of all accounts by seeking for each account key.
func seekTotal(txn *badger.Txn) ([]account, error) {
expected := uint64(numAccounts) * uint64(initialBal)
var accounts []account

var total uint64
for i := 0; i < numAccounts; i++ {
item, err := txn.Get(key(i))
item, err := get(txn, key(i))
if err != nil {
log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i))
return accounts, err
Expand Down Expand Up @@ -343,7 +362,11 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumMemtables(2).
// Do not GC any versions, because we need them for the disect..
WithNumVersionsToKeep(int(math.MaxInt32)).
WithValueThreshold(1) // Make all values go to value log
WithValueThreshold(1). // Make all values go to value log
WithCompression(options.ZSTD).
WithKeepL0InMemory(false).
WithMaxCacheSize(10 << 20)

if mmap {
opts = opts.WithTableLoadingMode(options.MemoryMap)
}
Expand Down
1 change: 0 additions & 1 deletion badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ func reportStats(c *y.Closer, db *badger.DB) {
if showKeysCount {
showKeysStats(db)
}

// fetch directory contents
if showDir {
err := filepath.Walk(sstDir, func(path string, info os.FileInfo, err error) error {
Expand Down
11 changes: 9 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
func Open(opt Options) (db *DB, err error) {
// It's okay to have zero compactors which will disable all compactions but
// we cannot have just one compactor otherwise we will end up with all data
// one level 2.
// on level 2.
if opt.NumCompactors == 1 {
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
}
Expand Down Expand Up @@ -324,6 +324,12 @@ func Open(opt Options) (db *DB, err error) {
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnEvict: func(i *ristretto.Item) {
table.BlockEvictHandler(i.Value)
},
OnReject: func(i *ristretto.Item) {
table.BlockEvictHandler(i.Value)
},
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
Expand Down Expand Up @@ -986,6 +992,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
defer iter.Close()
b := table.NewTableBuilder(bopts)
defer b.Close()

var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
Expand All @@ -997,7 +1004,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b.Finish()
return b.Finish(true)
}

type flushTask struct {
Expand Down
11 changes: 4 additions & 7 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
require.NoError(t, err)

_, err = fd.Write(b.Finish())
_, err = fd.Write(b.Finish(false))
require.NoError(t, err, "unable to write to file")

tab, err := table.OpenTable(fd, bopts)
Expand Down Expand Up @@ -670,16 +670,13 @@ func TestL0GCBug(t *testing.T) {
// Simulate a crash by not closing db1 but releasing the locks.
if db1.dirLockGuard != nil {
require.NoError(t, db1.dirLockGuard.release())
db1.dirLockGuard = nil
}
if db1.valueDirGuard != nil {
require.NoError(t, db1.valueDirGuard.release())
db1.valueDirGuard = nil
}
for _, f := range db1.vlog.filesMap {
require.NoError(t, f.fd.Close())
}
require.NoError(t, db1.registry.Close())
require.NoError(t, db1.lc.close())
require.NoError(t, db1.manifest.close())
require.NoError(t, db1.Close())

db2, err := Open(opts)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890 h1:/6pLcQq2GNdLPOotXztuLDXYRPraTIzZMPiJW8HzAwg=
github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
8 changes: 4 additions & 4 deletions key_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func validRegistry(fp *os.File, encryptionKey []byte) error {
}
if len(encryptionKey) > 0 {
// Decrypting sanity text.
if eSanityText, err = y.XORBlock(eSanityText, encryptionKey, iv); err != nil {
if eSanityText, err = y.XORBlockAllocate(eSanityText, encryptionKey, iv); err != nil {
return y.Wrapf(err, "During validRegistry")
}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (kri *keyRegistryIterator) next() (*pb.DataKey, error) {
}
if len(kri.encryptionKey) > 0 {
// Decrypt the key if the storage key exists.
if dataKey.Data, err = y.XORBlock(dataKey.Data, kri.encryptionKey, dataKey.Iv); err != nil {
if dataKey.Data, err = y.XORBlockAllocate(dataKey.Data, kri.encryptionKey, dataKey.Iv); err != nil {
return nil, y.Wrapf(err, "While decrypting datakey in keyRegistryIterator.next")
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
eSanity := sanityText
if len(opt.EncryptionKey) > 0 {
var err error
eSanity, err = y.XORBlock(eSanity, opt.EncryptionKey, iv)
eSanity, err = y.XORBlockAllocate(eSanity, opt.EncryptionKey, iv)
if err != nil {
return y.Wrapf(err, "Error while encrpting sanity text in WriteKeyRegistry")
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
return nil
}
var err error
k.Data, err = y.XORBlock(k.Data, storageKey, k.Iv)
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
return err
}
// In memory datakey will be plain text so encrypting before storing to the disk.
Expand Down
9 changes: 6 additions & 3 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ nextTable:
numKeys, numSkips, time.Since(timeStart))
if builder.Empty() {
// Cleanup builder resources:
builder.Finish()
builder.Finish(false)
builder.Close()
continue
}
Expand All @@ -677,7 +677,7 @@ nextTable:
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
if _, err := fd.Write(builder.Finish(false)); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}
tbl, err := table.OpenTable(fd, bopts)
Expand All @@ -688,7 +688,7 @@ nextTable:
var tbl *table.Table
var err error
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
tbl, err = table.OpenInMemoryTable(builder.Finish(true), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
Expand All @@ -700,6 +700,9 @@ nextTable:

mu.Lock()
newTables = append(newTables, tbl)
num := atomic.LoadInt32(&table.NumBlocks)
allocs := float64(atomic.LoadInt64(&y.NumAllocs)) / float64((1 << 20))
s.kv.opt.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs)
mu.Unlock()
}(builder)
}
Expand Down
4 changes: 2 additions & 2 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) {
panic(err)
}

if _, err = fd.Write(b.Finish()); err != nil {
if _, err = fd.Write(b.Finish(false)); err != nil {
panic(err)
}
tab, err := table.OpenTable(fd, opts)
Expand Down Expand Up @@ -740,7 +740,7 @@ func createEmptyTable(db *DB) *table.Table {
b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0)

// Open table in memory to avoid adding changes to manifest file.
tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts)
tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.Fil
UserMeta: 0,
}, 0)
}
_, err = f.Write(b.Finish())
_, err = f.Write(b.Finish(false))
require.NoError(t, err, "unable to write to file.")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
Expand Down
29 changes: 15 additions & 14 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func DefaultOptions(path string) Options {
Dir: path,
ValueDir: path,
LevelOneSize: 256 << 20,
LevelSizeMultiplier: 10,
LevelSizeMultiplier: 15,
TableLoadingMode: options.MemoryMap,
ValueLogLoadingMode: options.MemoryMap,
// table.MemoryMap to mmap() the tables.
Expand Down Expand Up @@ -176,6 +176,7 @@ func DefaultOptions(path string) Options {

func buildTableOptions(opt Options) table.Options {
return table.Options{
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadBloomsOnOpen: opt.LoadBloomsOnOpen,
Expand Down Expand Up @@ -230,17 +231,6 @@ func (opt Options) WithValueDir(val string) Options {
return opt
}

// WithLoggingLevel returns a new Options value with logging level of the
// default logger set to the given value.
// LoggingLevel sets the level of logging. It should be one of DEBUG, INFO,
// WARNING or ERROR levels.
//
// The default value of LoggingLevel is INFO.
func (opt Options) WithLoggingLevel(val loggingLevel) Options {
opt.Logger = defaultLogger(val)
return opt
}

// WithSyncWrites returns a new Options value with SyncWrites set to the given value.
//
// When SyncWrites is true all writes are synced to disk. Setting this to false would achieve better
Expand Down Expand Up @@ -318,6 +308,17 @@ func (opt Options) WithLogger(val Logger) Options {
return opt
}

// WithLoggingLevel returns a new Options value with logging level of the
// default logger set to the given value.
// LoggingLevel sets the level of logging. It should be one of DEBUG, INFO,
// WARNING or ERROR levels.
//
// The default value of LoggingLevel is INFO.
func (opt Options) WithLoggingLevel(val loggingLevel) Options {
opt.Logger = defaultLogger(val)
return opt
}

// WithMaxTableSize returns a new Options value with MaxTableSize set to the given value.
//
// MaxTableSize sets the maximum size in bytes for each LSM table or file.
Expand All @@ -335,7 +336,7 @@ func (opt Options) WithMaxTableSize(val int64) Options {
// Once a level grows to be larger than this ratio allowed, the compaction process will be
// triggered.
//
// The default value of LevelSizeMultiplier is 10.
// The default value of LevelSizeMultiplier is 15.
func (opt Options) WithLevelSizeMultiplier(val int) Options {
opt.LevelSizeMultiplier = val
return opt
Expand Down Expand Up @@ -460,7 +461,7 @@ func (opt Options) WithValueLogMaxEntries(val uint32) Options {
// NumCompactors sets the number of compaction workers to run concurrently.
// Setting this to zero stops compactions, which could eventually cause writes to block forever.
//
// The default value of NumCompactors is 2. One is dedicated just for L0.
// The default value of NumCompactors is 2. One is dedicated just for L0 and L1.
func (opt Options) WithNumCompactors(val int) Options {
opt.NumCompactors = val
return opt
Expand Down
3 changes: 2 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (w *sortedWriter) send(done bool) error {
return err
}
go func(builder *table.Builder) {
defer builder.Close()
err := w.createTable(builder)
w.throttle.Done(err)
}(w.builder)
Expand Down Expand Up @@ -410,7 +411,7 @@ func (w *sortedWriter) Done() error {
}

func (w *sortedWriter) createTable(builder *table.Builder) error {
data := builder.Finish()
data := builder.Finish(w.db.opt.InMemory)
if len(data) == 0 {
return nil
}
Expand Down
Loading

0 comments on commit 5c1b4c8

Please sign in to comment.