From e546244cc95dcea27260702b3fb044aa8a1698db Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 11 Nov 2020 13:02:30 -0800 Subject: [PATCH] Use AllocatorPool in Stream and TableBuilder (#1593) We removed the global z.Allocator pool from z package. Instead, we now use a new z.AllocatorPool class in the places which need a pool. In this case, we brought it to TableBuilder and Stream. Fix up a memory leak in Stream. Co-authored-by: Ibrahim Jarif --- badger/main.go | 1 - db.go | 4 ++++ db_test.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- options.go | 1 + stream.go | 58 ++++++++++++++++++++++++++++++------------------ stream_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++ stream_writer.go | 5 +++-- table/builder.go | 37 +++++++++++++++--------------- table/table.go | 6 +++-- test.sh | 48 ++++++++++++++++++++++++++------------- 12 files changed, 157 insertions(+), 63 deletions(-) diff --git a/badger/main.go b/badger/main.go index ed76978fa..c284f5aec 100644 --- a/badger/main.go +++ b/badger/main.go @@ -49,7 +49,6 @@ func main() { z.Free(out) cmd.Execute() - z.Done() fmt.Printf("Num Allocated Bytes at program end: %s\n", humanize.IBytes(uint64(z.NumAllocBytes()))) if z.NumAllocBytes() > 0 { diff --git a/db.go b/db.go index 249e14c77..079efcded 100644 --- a/db.go +++ b/db.go @@ -95,6 +95,7 @@ type DB struct { registry *KeyRegistry blockCache *ristretto.Cache indexCache *ristretto.Cache + allocPool *z.AllocatorPool } const ( @@ -218,6 +219,7 @@ func Open(opt Options) (*DB, error) { valueDirGuard: valueDirLockGuard, orc: newOracle(opt), pub: newPublisher(), + allocPool: z.NewAllocatorPool(8), } // Cleanup all the goroutines started by badger in case of an error. defer func() { @@ -476,6 +478,8 @@ func (db *DB) IsClosed() bool { } func (db *DB) close() (err error) { + defer db.allocPool.Release() + db.opt.Debugf("Closing database") db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs)) diff --git a/db_test.go b/db_test.go index 53aeb3423..f2b38c2bb 100644 --- a/db_test.go +++ b/db_test.go @@ -532,7 +532,7 @@ func TestGetMore(t *testing.T) { data := func(i int) []byte { return []byte(fmt.Sprintf("%b", i)) } - n := 500000 + n := 200000 m := 45 // Increasing would cause ErrTxnTooBig for i := 0; i < n; i += m { if (i % 10000) == 0 { diff --git a/go.mod b/go.mod index bdb784454..e8b5c83bf 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad + github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index 657325fbb..f02228044 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad h1:BN2A+lqBwLx57/l1MUv3iKLIY8XddNkZg2zZN/BFM1I= -github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= +github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521 h1:81yY9QfuVfSVccqD7cLA/JohhMX+TQCfSjmGS7jUJCc= +github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/options.go b/options.go index 45bd5e8ff..aa1701069 100644 --- a/options.go +++ b/options.go @@ -177,6 +177,7 @@ func buildTableOptions(db *DB) table.Options { ZSTDCompressionLevel: opt.ZSTDCompressionLevel, BlockCache: db.blockCache, IndexCache: db.indexCache, + AllocPool: db.allocPool, DataKey: dk, } } diff --git a/stream.go b/stream.go index d3544662a..972226e4c 100644 --- a/stream.go +++ b/stream.go @@ -92,6 +92,8 @@ type Stream struct { // Use allocators to generate KVs. allocatorsMu sync.RWMutex allocators map[int]*z.Allocator + + allocPool *z.AllocatorPool } func (st *Stream) Allocator(threadId int) *z.Allocator { @@ -182,15 +184,9 @@ func (st *Stream) produceRanges(ctx context.Context) { func (st *Stream) newAllocator(threadId int) *z.Allocator { st.allocatorsMu.Lock() - var a *z.Allocator - if cur, ok := st.allocators[threadId]; ok && cur.Size() == 0 { - a = cur // Reuse. - } else { - // Current allocator has been used already. Create a new one. - a = z.NewAllocator(batchSize) - // a.Tag = fmt.Sprintf("Stream %d: %s", threadId, st.LogPrefix) - st.allocators[threadId] = a - } + a := st.allocPool.Get(batchSize) + a.Tag = "Stream " + st.LogPrefix + st.allocators[threadId] = a st.allocatorsMu.Unlock() return a } @@ -206,6 +202,12 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { } defer txn.Discard() + // produceKVs is running iterate serially. So, we can define the outList here. + outList := new(pb.KVList) + // There would be one last remaining Allocator for this threadId when produceKVs finishes, which + // would be released by Orchestrate. + outList.AllocRef = st.newAllocator(threadId).Ref + iterate := func(kr keyRange) error { iterOpts := DefaultIteratorOptions iterOpts.AllVersions = true @@ -218,9 +220,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { // This unique stream id is used to identify all the keys from this iteration. streamId := atomic.AddUint32(&st.nextStreamId, 1) - outList := new(pb.KVList) - outList.AllocRef = st.newAllocator(threadId).Ref - sendIt := func() error { select { case st.kvChan <- outList: @@ -280,10 +279,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { StreamDone: true, }) } - if len(outList.Kv) > 0 { - return sendIt() - } - return nil + return sendIt() } for { @@ -309,6 +305,16 @@ func (st *Stream) streamKVs(ctx context.Context) error { defer t.Stop() now := time.Now() + allocs := make(map[uint64]struct{}) + returnAllocs := func() { + for ref := range allocs { + a := z.AllocatorFrom(ref) + st.allocPool.Return(a) + delete(allocs, ref) + } + } + defer returnAllocs() + sendBatch := func(batch *pb.KVList) error { sz := uint64(proto.Size(batch)) bytesSent += sz @@ -320,6 +326,7 @@ func (st *Stream) streamKVs(ctx context.Context) error { st.db.opt.Infof("%s Created batch of size: %s in %s.\n", st.LogPrefix, humanize.Bytes(sz), time.Since(t)) + returnAllocs() return nil } @@ -340,6 +347,7 @@ func (st *Stream) streamKVs(ctx context.Context) error { } y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) + allocs[kvs.AllocRef] = struct{}{} default: break loop @@ -372,6 +380,7 @@ outer: } y.AssertTrue(kvs != nil) batch = kvs + allocs[kvs.AllocRef] = struct{}{} // Otherwise, slurp more keys into this batch. if err := slurp(batch); err != nil { @@ -391,6 +400,17 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { + st.allocPool = z.NewAllocatorPool(st.NumGo) + defer func() { + for _, a := range st.allocators { + // Using AllocatorFrom is better because if the allocator is already freed up, it would + // return nil. + a = z.AllocatorFrom(a.Ref) + a.Release() + } + st.allocPool.Release() + }() + st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. // kvChan should only have a small capacity to ensure that we don't buffer up too much data if @@ -439,17 +459,13 @@ func (st *Stream) Orchestrate(ctx context.Context) error { // Wait for key streaming to be over. err := <-kvErr - - for _, a := range st.allocators { - a.Release() - } return err } func (db *DB) newStream() *Stream { return &Stream{ db: db, - NumGo: 16, + NumGo: 8, LogPrefix: "Badger.Stream", allocators: make(map[int]*z.Allocator), } diff --git a/stream_test.go b/stream_test.go index d6bd5e568..536966008 100644 --- a/stream_test.go +++ b/stream_test.go @@ -25,6 +25,7 @@ import ( "strings" "testing" + "github.com/dgraph-io/badger/v2/pb" bpb "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" "github.com/golang/protobuf/proto" @@ -265,3 +266,54 @@ func TestBigStream(t *testing.T) { } require.NoError(t, db.Close()) } + +// There was a bug in the stream writer code which would cause allocators to be +// freed up twice if the default keyToList was not used. This test verifies that issue. +func TestStreamCustomKeyToList(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + db, err := OpenManaged(DefaultOptions(dir)) + require.NoError(t, err) + + var count int + for _, key := range []string{"p0", "p1", "p2"} { + for i := 1; i <= 100; i++ { + txn := db.NewTransactionAt(math.MaxUint64, true) + require.NoError(t, txn.SetEntry(NewEntry([]byte(key), value(i)))) + count++ + require.NoError(t, txn.CommitAt(uint64(i), nil)) + } + } + + stream := db.NewStreamAt(math.MaxUint64) + stream.LogPrefix = "Testing" + stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) { + item := itr.Item() + val, err := item.ValueCopy(nil) + if err != nil { + return nil, err + } + kv := &pb.KV{ + Key: y.Copy(item.Key()), + Value: val, + } + return &pb.KVList{ + Kv: []*pb.KV{kv}, + }, nil + } + res := map[string]struct{}{"p0": {}, "p1": {}, "p2": {}} + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + key := string(kv.Key) + if _, ok := res[key]; !ok { + panic(fmt.Sprintf("%s key not found", key)) + } + delete(res, key) + } + return nil + } + require.NoError(t, stream.Orchestrate(ctxb)) + require.Zero(t, len(res)) +} diff --git a/stream_writer.go b/stream_writer.go index af1335920..1b1ef17f8 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -270,6 +270,7 @@ func (sw *StreamWriter) Cancel() { type sortedWriter struct { db *DB throttle *y.Throttle + opts table.Options builder *table.Builder lastKey []byte @@ -286,6 +287,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { } w := &sortedWriter{ db: sw.db, + opts: bopts, streamID: streamID, throttle: sw.throttle, builder: table.NewTableBuilder(bopts), @@ -379,8 +381,7 @@ func (w *sortedWriter) send(done bool) error { return nil } - bopts := buildTableOptions(w.db) - w.builder = table.NewTableBuilder(bopts) + w.builder = table.NewTableBuilder(w.opts) return nil } diff --git a/table/builder.go b/table/builder.go index 925821a78..5f400385f 100644 --- a/table/builder.go +++ b/table/builder.go @@ -86,7 +86,7 @@ type Builder struct { lenOffsets uint32 estimatedSize uint32 keyHashes []uint32 // Used for building the bloomfilter. - opt *Options + opts *Options maxVersion uint64 onDiskSize uint32 @@ -127,17 +127,18 @@ func NewTableBuilder(opts Options) *Builder { sz = maxAllocatorInitialSz } b := &Builder{ - alloc: z.GetAllocatorFromPool(sz), - opt: &opts, + alloc: opts.AllocPool.Get(sz), + opts: &opts, } + b.alloc.Tag = "Builder" b.curBlock = &bblock{ data: b.alloc.Allocate(opts.BlockSize + padding), } - b.opt.tableCapacity = uint64(float64(b.opt.TableSize) * 0.9) + b.opts.tableCapacity = uint64(float64(b.opts.TableSize) * 0.95) // If encryption or compression is not enabled, do not start compression/encryption goroutines // and write directly to the buffer. - if b.opt.Compression == options.None && b.opt.DataKey == nil { + if b.opts.Compression == options.None && b.opts.DataKey == nil { return b } @@ -154,7 +155,7 @@ func NewTableBuilder(opts Options) *Builder { func (b *Builder) handleBlock() { defer b.wg.Done() - doCompress := b.opt.Compression != options.None + doCompress := b.opts.Compression != options.None for item := range b.blockChan { // Extract the block. blockBuf := item.data[:item.end] @@ -185,7 +186,7 @@ func (b *Builder) handleBlock() { // Close closes the TableBuilder. func (b *Builder) Close() { - z.ReturnAllocator(b.alloc) + b.opts.AllocPool.Return(b.alloc) } // Empty returns whether it's empty. @@ -313,7 +314,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { // Integer overflow check for table size. y.AssertTrue(uint64(b.curBlock.end)+uint64(estimatedSize) < math.MaxUint32) - return estimatedSize > uint32(b.opt.BlockSize) + return estimatedSize > uint32(b.opts.BlockSize) } // Add adds a key-value pair to the block. @@ -322,7 +323,7 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { b.finishBlock() // Create a new block and start writing. b.curBlock = &bblock{ - data: b.alloc.Allocate(b.opt.BlockSize + padding), + data: b.alloc.Allocate(b.opts.BlockSize + padding), } } b.addHelper(key, value, valueLen) @@ -338,7 +339,7 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { func (b *Builder) ReachedCapacity() bool { // If encryption/compression is enabled then use the compresssed size. sumBlockSizes := atomic.LoadUint32(&b.compressedSize) - if b.opt.Compression == options.None && b.opt.DataKey == nil { + if b.opts.Compression == options.None && b.opts.DataKey == nil { sumBlockSizes = b.uncompressedSize } blocksSize := sumBlockSizes + // actual length of current buffer @@ -351,7 +352,7 @@ func (b *Builder) ReachedCapacity() bool { 4 + // Index length b.lenOffsets - return uint64(estimateSz) > b.opt.tableCapacity + return uint64(estimateSz) > b.opts.tableCapacity } // Finish finishes the table by appending the index. @@ -412,8 +413,8 @@ func (b *Builder) Done() buildData { } var f y.Filter - if b.opt.BloomFalsePositive > 0 { - bits := y.BloomBitsPerKey(len(b.keyHashes), b.opt.BloomFalsePositive) + if b.opts.BloomFalsePositive > 0 { + bits := y.BloomBitsPerKey(len(b.keyHashes), b.opts.BloomFalsePositive) f = y.NewFilter(b.keyHashes, bits) } index, dataSize := b.buildIndex(f) @@ -455,11 +456,11 @@ func (b *Builder) calculateChecksum(data []byte) []byte { // DataKey returns datakey of the builder. func (b *Builder) DataKey() *pb.DataKey { - return b.opt.DataKey + return b.opts.DataKey } func (b *Builder) Opts() *Options { - return b.opt + return b.opts } // encrypt will encrypt the given data and appends IV to the end of the encrypted data. @@ -483,12 +484,12 @@ func (b *Builder) encrypt(data []byte) ([]byte, error) { // shouldEncrypt tells us whether to encrypt the data or not. // We encrypt only if the data key exist. Otherwise, not. func (b *Builder) shouldEncrypt() bool { - return b.opt.DataKey != nil + return b.opts.DataKey != nil } // compressData compresses the given data. func (b *Builder) compressData(data []byte) ([]byte, error) { - switch b.opt.Compression { + switch b.opts.Compression { case options.None: return data, nil case options.Snappy: @@ -498,7 +499,7 @@ func (b *Builder) compressData(data []byte) ([]byte, error) { case options.ZSTD: sz := y.ZSTDCompressBound(len(data)) dst := b.alloc.Allocate(sz) - return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel) + return y.ZSTDCompress(dst, data, b.opts.ZSTDCompressionLevel) } return nil, errors.New("Unsupported compression type") } diff --git a/table/table.go b/table/table.go index a28bd33c1..0ff60c22b 100644 --- a/table/table.go +++ b/table/table.go @@ -80,6 +80,8 @@ type Options struct { BlockCache *ristretto.Cache IndexCache *ristretto.Cache + AllocPool *z.AllocatorPool + // ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks. ZSTDCompressionLevel int } @@ -241,12 +243,12 @@ func CreateTable(fname string, builder *Builder) (*Table, error) { written := bd.Copy(mf.Data) y.AssertTrue(written == len(mf.Data)) - if builder.opt.SyncWrites { + if builder.opts.SyncWrites { if err := z.Msync(mf.Data); err != nil { return nil, y.Wrapf(err, "while calling msync on %s", fname) } } - return OpenTable(mf, *builder.opt) + return OpenTable(mf, *builder.opts) } // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function diff --git a/test.sh b/test.sh index 9077b7e96..9b4b94698 100755 --- a/test.sh +++ b/test.sh @@ -36,17 +36,19 @@ function InstallJemalloc() { popd } +tags="-tags=jemalloc" + # Ensure that we can compile the binary. pushd badger -go build -v . +go build -v $tags . popd -tags="-tags=jemalloc" # tags="" InstallJemalloc # Run the memory intensive tests first. manual() { + timeout="-timeout 2m" echo "==> Running package tests for $packages" set -e for pkg in $packages; do @@ -59,10 +61,10 @@ manual() { # Run the special Truncate test. rm -rf p set -e - go test $tags -run='TestTruncateVlogNoClose$' --manual=true + go test $tags $timeout -run='TestTruncateVlogNoClose$' --manual=true truncate --size=4096 p/000000.vlog - go test $tags -run='TestTruncateVlogNoClose2$' --manual=true - go test $tags -run='TestTruncateVlogNoClose3$' --manual=true + go test $tags $timeout -run='TestTruncateVlogNoClose2$' --manual=true + go test $tags $timeout -run='TestTruncateVlogNoClose3$' --manual=true rm -rf p # TODO(ibrahim): Let's make these tests have Manual prefix. @@ -71,14 +73,14 @@ manual() { # TestValueGCManaged # TestDropPrefix # TestDropAllManaged - go test $tags -run='TestBigKeyValuePairs$' --manual=true - go test $tags -run='TestPushValueLogLimit' --manual=true - go test $tags -run='TestKeyCount' --manual=true - go test $tags -run='TestIteratePrefix' --manual=true - go test $tags -run='TestIterateParallel' --manual=true - go test $tags -run='TestBigStream' --manual=true - go test $tags -run='TestGoroutineLeak' --manual=true - go test $tags -run='TestGetMore' --manual=true + go test $tags $timeout -run='TestBigKeyValuePairs$' --manual=true + go test $tags $timeout -run='TestPushValueLogLimit' --manual=true + go test $tags $timeout -run='TestKeyCount' --manual=true + go test $tags $timeout -run='TestIteratePrefix' --manual=true + go test $tags $timeout -run='TestIterateParallel' --manual=true + go test $tags $timeout -run='TestBigStream' --manual=true + go test $tags $timeout -run='TestGoroutineLeak' --manual=true + go test $tags $timeout -run='TestGetMore' --manual=true echo "==> DONE manual tests" } @@ -89,11 +91,27 @@ root() { echo "==> Running root level tests." set -e - go test $tags -timeout=25m . -race -parallel 16 + go test $tags -timeout=25m . -v -race -parallel 16 echo "==> DONE root level tests" } +stream() { + pushd badger + baseDir=$(mktemp -d -p .) + ./badger benchmark write -s --dir=$baseDir/test | tee $baseDir/log.txt + ./badger stream --dir=$baseDir/test -o "$baseDir/test2" | tee --append $baseDir/log.txt + count=$(cat "$baseDir/log.txt" | grep "at program end: 0 B" | wc -l) + rm -rf $baseDir + if [ $count -ne 2 ]; then + echo "LEAK detected in Badger stream." + return 1 + fi + echo "==> DONE stream test" + return 0 +} + +export -f stream export -f manual export -f root -parallel --halt now,fail=1 --progress --line-buffer ::: manual root +parallel --halt now,fail=1 --progress --line-buffer ::: stream manual root