Skip to content

Commit

Permalink
opt(builder): Use z.Allocator for building tables (#1576)
Browse files Browse the repository at this point in the history
Avoid Go memory spikes by using z.Allocator to allocate blocks during table generation. Also avoid having to copy data over, unless necessary -- instead write the data directly to the mmapped file.

With these changes, we're able to load 1 billion randomly generated 32-byte keys, 128-byte values in 1.5 hrs, with memory usage maxing out to 5 GB.

Co-authored-by: Manish R Jain <[email protected]>
Co-authored-by: Ibrahim Jarif <[email protected]>
  • Loading branch information
3 people authored Nov 3, 2020
1 parent b09274a commit e2e08f9
Show file tree
Hide file tree
Showing 19 changed files with 310 additions and 359 deletions.
2 changes: 1 addition & 1 deletion badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func reportStats(c *z.Closer, db *badger.DB) {
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
"entries written: %d, speed: %d/sec, Memory: %s\n",
"entries written: %d, speed: %d/sec, jemalloc: %s\n",
y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))
Expand Down
1 change: 1 addition & 0 deletions badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func main() {
z.Free(out)

cmd.Execute()
z.Done()
fmt.Printf("Num Allocated Bytes at program end: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())))
if z.NumAllocBytes() > 0 {
Expand Down
3 changes: 3 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ func TestWriteBatch(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
test(t, db)
})
t.Logf("Disk mode done\n")
})
t.Run("InMemory mode", func(t *testing.T) {
t.Skipf("TODO(ibrahim): Please fix this")
opt := getTestOptions("")
opt.InMemory = true
db, err := Open(opt)
require.NoError(t, err)
test(t, db)
t.Logf("Disk mode done\n")
require.NoError(t, db.Close())
})
}
Expand Down
29 changes: 12 additions & 17 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,12 +957,10 @@ func arenaSize(opt Options) int64 {
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) []byte {
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
defer iter.Close()
b := table.NewTableBuilder(bopts)
defer b.Close()

var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
Expand All @@ -974,7 +972,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b.Finish(true)
return b
}

type flushTask struct {
Expand All @@ -989,30 +987,26 @@ func (db *DB) handleFlushTask(ft flushTask) error {
return nil
}

dk, err := db.registry.LatestDataKey()
if err != nil {
return y.Wrapf(err, "failed to get datakey in db.handleFlushTask")
}
bopts := buildTableOptions(db.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.BlockCache = db.blockCache
bopts.IndexCache = db.indexCache
tableData := buildL0Table(ft, bopts)
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
defer builder.Close()

// buildL0Table can return nil if the none of the items in the skiplist are
// added to the builder. This can happen when drop prefix is set and all
// the items are skipped.
if len(tableData) == 0 {
if builder.Empty() {
builder.Finish()
return nil
}

fileID := db.lc.reserveFileID()
var tbl *table.Table
var err error
if db.opt.InMemory {
tbl, err = table.OpenInMemoryTable(tableData, fileID, &bopts)
data := builder.Finish()
tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
} else {
tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), tableData, bopts)
tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
}
if err != nil {
return y.Wrap(err, "error while creating table")
Expand Down Expand Up @@ -1789,6 +1783,7 @@ func (db *DB) StreamDB(outOptions Options) error {
// Stream contents of DB to the output DB.
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)

stream.Send = func(kvs *pb.KVList) error {
return writer.Write(kvs)
}
Expand Down
32 changes: 18 additions & 14 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
Expand Down Expand Up @@ -504,7 +505,7 @@ func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) {
// createTableWithRange function is used in TestCompactionFilePicking. It creates
// a table with key starting from start and ending with end.
func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
bopts := buildTableOptions(db.opt)
bopts := buildTableOptions(db)
b := table.NewTableBuilder(bopts)
defer b.Close()
nums := []int{start, end}
Expand All @@ -517,7 +518,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
}

fileID := db.lc.reserveFileID()
tab, err := table.CreateTable(table.NewFilename(fileID, db.opt.Dir), b.Finish(false), bopts)
tab, err := table.CreateTable(table.NewFilename(fileID, db.opt.Dir), b)
require.NoError(t, err)
return tab
}
Expand Down Expand Up @@ -987,27 +988,30 @@ func TestKeyCount(t *testing.T) {
defer db.Close()
writeSorted(db, N)
require.NoError(t, db.Close())
t.Logf("Writing DONE\n")

// Read the db
db2, err := Open(DefaultOptions(dir))
y.Check(err)
defer db.Close()
lastKey := -1
count := 0
db2.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true
it := txn.NewIterator(iopt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
count++
i := it.Item()
key := binary.BigEndian.Uint64(i.Key())

streams := make(map[uint32]int)
stream := db2.NewStream()
stream.Send = func(list *pb.KVList) error {
count += len(list.Kv)
for _, kv := range list.Kv {
last := streams[kv.StreamId]
key := binary.BigEndian.Uint64(kv.Key)
// The following should happen as we're writing sorted data.
require.Equalf(t, lastKey+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
lastKey = int(key)
if last > 0 {
require.Equalf(t, last+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
}
streams[kv.StreamId] = int(key)
}
return nil
})
}
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}
26 changes: 15 additions & 11 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,19 @@ func TestForceCompactL0(t *testing.T) {
}

func TestStreamDB(t *testing.T) {
check := func(db *DB) {
for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
txn := db.NewTransactionAt(1, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.EqualValues(t, val, getItemValue(t, item))
require.Equal(t, byte(0x00), item.UserMeta())
txn.Discard()
}
}

dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
Expand All @@ -393,6 +406,7 @@ func TestStreamDB(t *testing.T) {
require.NoError(t, writer.SetEntryAt(NewEntry(key, val).WithMeta(0x00), 1))
}
require.NoError(t, writer.Flush())
check(db)

outDir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
Expand All @@ -404,17 +418,7 @@ func TestStreamDB(t *testing.T) {
defer func() {
require.NoError(t, outDB.Close())
}()

for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
txn := outDB.NewTransactionAt(1, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.EqualValues(t, val, getItemValue(t, item))
require.Equal(t, byte(0x00), item.UserMeta())
txn.Discard()
}
check(outDB)
}

func dirSize(path string) (int64, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f
github.com/dgraph-io/ristretto v0.0.4-0.20201103012257-4dcfe40a6fc0
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f h1:YPDUnM9Rkd0V41Ie43v/QoNgz5NNGcZv05UnYEnQgo4=
github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201103012257-4dcfe40a6fc0 h1:5ZtQ7aGng65gFPo1sdoZI0pTpYjJDU4t+rIFFoWUOpc=
github.com/dgraph-io/ristretto v0.0.4-0.20201103012257-4dcfe40a6fc0/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down Expand Up @@ -64,6 +64,7 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
Expand Down
26 changes: 7 additions & 19 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
rerr = y.Wrapf(err, "Error while reading datakey")
return
}
topt := buildTableOptions(db.opt)
// Set compression from table manifest.
topt := buildTableOptions(db)
// Explicitly set Compression and DataKey based on how the table was generated.
topt.Compression = tf.Compression
topt.DataKey = dk
topt.BlockCache = db.blockCache
topt.IndexCache = db.indexCache

mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
if err != nil {
Expand Down Expand Up @@ -758,17 +756,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
break
}

dk, err := s.kv.registry.LatestDataKey()
if err != nil {
inflightBuilders.Done(y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables"))
return
}
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.BlockCache = s.kv.blockCache
bopts.IndexCache = s.kv.indexCache

bopts := buildTableOptions(s.kv)
// Set TableSize to the target file size for that level.
bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
builder := table.NewTableBuilder(bopts)
Expand All @@ -780,7 +768,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// called Add() at least once, and builder is not Empty().
if builder.Empty() {
// Cleanup builder resources:
builder.Finish(false)
builder.Finish()
builder.Close()
continue
}
Expand All @@ -791,18 +779,18 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
break
}
go func(builder *table.Builder) {
var err error
defer builder.Close()
defer inflightBuilders.Done(err)

build := func(fileID uint64) (*table.Table, error) {
fname := table.NewFilename(fileID, s.kv.opt.Dir)
return table.CreateTable(fname, builder.Finish(false), bopts)
return table.CreateTable(fname, builder)
}

var tbl *table.Table
var err error
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(true), fileID, &bopts)
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
Expand Down
4 changes: 2 additions & 2 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) {
b.Add(key, val, 0)
}
fname := table.NewFilename(db.lc.reserveFileID(), db.opt.Dir)
tab, err := table.CreateTable(fname, b.Finish(false), opts)
tab, err := table.CreateTable(fname, b)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -768,7 +768,7 @@ func createEmptyTable(db *DB) *table.Table {
b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0)

// Open table in memory to avoid adding changes to manifest file.
tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts)
tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *table.
}, 0)
}

tbl, err := table.CreateTable(filename, b.Finish(false), bopts)
tbl, err := table.CreateTable(filename, b)
require.NoError(t, err)
return tbl
}
Expand Down
9 changes: 8 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
)

// Note: If you add a new option X make sure you also add a WithX method on Options.
Expand Down Expand Up @@ -161,7 +162,10 @@ func DefaultOptions(path string) Options {
}
}

func buildTableOptions(opt Options) table.Options {
func buildTableOptions(db *DB) table.Options {
opt := db.opt
dk, err := db.registry.LatestDataKey()
y.Check(err)
return table.Options{
SyncWrites: opt.SyncWrites,
ReadOnly: opt.ReadOnly,
Expand All @@ -171,6 +175,9 @@ func buildTableOptions(opt Options) table.Options {
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
DataKey: dk,
}
}

Expand Down
Loading

0 comments on commit e2e08f9

Please sign in to comment.