Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use AllocatorPool in Stream and TableBuilder (#1593)
Browse files Browse the repository at this point in the history
We removed the global z.Allocator pool from z package. Instead, we now use a new z.AllocatorPool class in the places which need a pool. In this case, we brought it to TableBuilder and Stream.

Fix up a memory leak in Stream.

Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io>
2 people authored and danielmai committed Nov 11, 2020

Verified

This commit was signed with the committer’s verified signature.
miscco Michael Schellenberger Costa
1 parent 6c07455 commit e546244
Showing 12 changed files with 157 additions and 63 deletions.
1 change: 0 additions & 1 deletion badger/main.go
Original file line number Diff line number Diff line change
@@ -49,7 +49,6 @@ func main() {
z.Free(out)

cmd.Execute()
z.Done()
fmt.Printf("Num Allocated Bytes at program end: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())))
if z.NumAllocBytes() > 0 {
4 changes: 4 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
@@ -95,6 +95,7 @@ type DB struct {
registry *KeyRegistry
blockCache *ristretto.Cache
indexCache *ristretto.Cache
allocPool *z.AllocatorPool
}

const (
@@ -218,6 +219,7 @@ func Open(opt Options) (*DB, error) {
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
@@ -476,6 +478,8 @@ func (db *DB) IsClosed() bool {
}

func (db *DB) close() (err error) {
defer db.allocPool.Release()

db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs))

2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
@@ -532,7 +532,7 @@ func TestGetMore(t *testing.T) {
data := func(i int) []byte {
return []byte(fmt.Sprintf("%b", i))
}
n := 500000
n := 200000
m := 45 // Increasing would cause ErrTxnTooBig
for i := 0; i < n; i += m {
if (i % 10000) == 0 {
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad
github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -15,8 +15,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad h1:BN2A+lqBwLx57/l1MUv3iKLIY8XddNkZg2zZN/BFM1I=
github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521 h1:81yY9QfuVfSVccqD7cLA/JohhMX+TQCfSjmGS7jUJCc=
github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
1 change: 1 addition & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -177,6 +177,7 @@ func buildTableOptions(db *DB) table.Options {
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
AllocPool: db.allocPool,
DataKey: dk,
}
}
58 changes: 37 additions & 21 deletions stream.go
Original file line number Diff line number Diff line change
@@ -92,6 +92,8 @@ type Stream struct {
// Use allocators to generate KVs.
allocatorsMu sync.RWMutex
allocators map[int]*z.Allocator

allocPool *z.AllocatorPool
}

func (st *Stream) Allocator(threadId int) *z.Allocator {
@@ -182,15 +184,9 @@ func (st *Stream) produceRanges(ctx context.Context) {

func (st *Stream) newAllocator(threadId int) *z.Allocator {
st.allocatorsMu.Lock()
var a *z.Allocator
if cur, ok := st.allocators[threadId]; ok && cur.Size() == 0 {
a = cur // Reuse.
} else {
// Current allocator has been used already. Create a new one.
a = z.NewAllocator(batchSize)
// a.Tag = fmt.Sprintf("Stream %d: %s", threadId, st.LogPrefix)
st.allocators[threadId] = a
}
a := st.allocPool.Get(batchSize)
a.Tag = "Stream " + st.LogPrefix
st.allocators[threadId] = a
st.allocatorsMu.Unlock()
return a
}
@@ -206,6 +202,12 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
}
defer txn.Discard()

// produceKVs is running iterate serially. So, we can define the outList here.
outList := new(pb.KVList)
// There would be one last remaining Allocator for this threadId when produceKVs finishes, which
// would be released by Orchestrate.
outList.AllocRef = st.newAllocator(threadId).Ref

iterate := func(kr keyRange) error {
iterOpts := DefaultIteratorOptions
iterOpts.AllVersions = true
@@ -218,9 +220,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
// This unique stream id is used to identify all the keys from this iteration.
streamId := atomic.AddUint32(&st.nextStreamId, 1)

outList := new(pb.KVList)
outList.AllocRef = st.newAllocator(threadId).Ref

sendIt := func() error {
select {
case st.kvChan <- outList:
@@ -280,10 +279,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
StreamDone: true,
})
}
if len(outList.Kv) > 0 {
return sendIt()
}
return nil
return sendIt()
}

for {
@@ -309,6 +305,16 @@ func (st *Stream) streamKVs(ctx context.Context) error {
defer t.Stop()
now := time.Now()

allocs := make(map[uint64]struct{})
returnAllocs := func() {
for ref := range allocs {
a := z.AllocatorFrom(ref)
st.allocPool.Return(a)
delete(allocs, ref)
}
}
defer returnAllocs()

sendBatch := func(batch *pb.KVList) error {
sz := uint64(proto.Size(batch))
bytesSent += sz
@@ -320,6 +326,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))

returnAllocs()
return nil
}

@@ -340,6 +347,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
}
y.AssertTrue(kvs != nil)
batch.Kv = append(batch.Kv, kvs.Kv...)
allocs[kvs.AllocRef] = struct{}{}

default:
break loop
@@ -372,6 +380,7 @@ outer:
}
y.AssertTrue(kvs != nil)
batch = kvs
allocs[kvs.AllocRef] = struct{}{}

// Otherwise, slurp more keys into this batch.
if err := slurp(batch); err != nil {
@@ -391,6 +400,17 @@ outer:
// are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
// return that error. Orchestrate can be called multiple times, but in serial order.
func (st *Stream) Orchestrate(ctx context.Context) error {
st.allocPool = z.NewAllocatorPool(st.NumGo)
defer func() {
for _, a := range st.allocators {
// Using AllocatorFrom is better because if the allocator is already freed up, it would
// return nil.
a = z.AllocatorFrom(a.Ref)
a.Release()
}
st.allocPool.Release()
}()

st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.

// kvChan should only have a small capacity to ensure that we don't buffer up too much data if
@@ -439,17 +459,13 @@ func (st *Stream) Orchestrate(ctx context.Context) error {

// Wait for key streaming to be over.
err := <-kvErr

for _, a := range st.allocators {
a.Release()
}
return err
}

func (db *DB) newStream() *Stream {
return &Stream{
db: db,
NumGo: 16,
NumGo: 8,
LogPrefix: "Badger.Stream",
allocators: make(map[int]*z.Allocator),
}
52 changes: 52 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"strings"
"testing"

"github.com/dgraph-io/badger/v2/pb"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/golang/protobuf/proto"
@@ -265,3 +266,54 @@ func TestBigStream(t *testing.T) {
}
require.NoError(t, db.Close())
}

// There was a bug in the stream writer code which would cause allocators to be
// freed up twice if the default keyToList was not used. This test verifies that issue.
func TestStreamCustomKeyToList(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
for _, key := range []string{"p0", "p1", "p2"} {
for i := 1; i <= 100; i++ {
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key), value(i))))
count++
require.NoError(t, txn.CommitAt(uint64(i), nil))
}
}

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
item := itr.Item()
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
kv := &pb.KV{
Key: y.Copy(item.Key()),
Value: val,
}
return &pb.KVList{
Kv: []*pb.KV{kv},
}, nil
}
res := map[string]struct{}{"p0": {}, "p1": {}, "p2": {}}
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
key := string(kv.Key)
if _, ok := res[key]; !ok {
panic(fmt.Sprintf("%s key not found", key))
}
delete(res, key)
}
return nil
}
require.NoError(t, stream.Orchestrate(ctxb))
require.Zero(t, len(res))
}
5 changes: 3 additions & 2 deletions stream_writer.go
Original file line number Diff line number Diff line change
@@ -270,6 +270,7 @@ func (sw *StreamWriter) Cancel() {
type sortedWriter struct {
db *DB
throttle *y.Throttle
opts table.Options

builder *table.Builder
lastKey []byte
@@ -286,6 +287,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
}
w := &sortedWriter{
db: sw.db,
opts: bopts,
streamID: streamID,
throttle: sw.throttle,
builder: table.NewTableBuilder(bopts),
@@ -379,8 +381,7 @@ func (w *sortedWriter) send(done bool) error {
return nil
}

bopts := buildTableOptions(w.db)
w.builder = table.NewTableBuilder(bopts)
w.builder = table.NewTableBuilder(w.opts)
return nil
}

Loading

0 comments on commit e546244

Please sign in to comment.