Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AllocatorPool in Stream and TableBuilder #1593

Merged
merged 10 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func main() {
z.Free(out)

cmd.Execute()
z.Done()
fmt.Printf("Num Allocated Bytes at program end: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())))
if z.NumAllocBytes() > 0 {
Expand Down
4 changes: 4 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type DB struct {
registry *KeyRegistry
blockCache *ristretto.Cache
indexCache *ristretto.Cache
allocPool *z.AllocatorPool
}

const (
Expand Down Expand Up @@ -218,6 +219,7 @@ func Open(opt Options) (*DB, error) {
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
Expand Down Expand Up @@ -476,6 +478,8 @@ func (db *DB) IsClosed() bool {
}

func (db *DB) close() (err error) {
defer db.allocPool.Release()

db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs))

Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func TestGetMore(t *testing.T) {
data := func(i int) []byte {
return []byte(fmt.Sprintf("%b", i))
}
n := 500000
n := 200000
m := 45 // Increasing would cause ErrTxnTooBig
for i := 0; i < n; i += m {
if (i % 10000) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad
github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad h1:BN2A+lqBwLx57/l1MUv3iKLIY8XddNkZg2zZN/BFM1I=
github.com/dgraph-io/ristretto v0.0.4-0.20201105000107-750f5be31aad/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521 h1:81yY9QfuVfSVccqD7cLA/JohhMX+TQCfSjmGS7jUJCc=
github.com/dgraph-io/ristretto v0.0.4-0.20201111190620-1040b7ded521/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
1 change: 1 addition & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func buildTableOptions(db *DB) table.Options {
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
AllocPool: db.allocPool,
DataKey: dk,
}
}
Expand Down
58 changes: 37 additions & 21 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Stream struct {
// Use allocators to generate KVs.
allocatorsMu sync.RWMutex
allocators map[int]*z.Allocator

allocPool *z.AllocatorPool
}

func (st *Stream) Allocator(threadId int) *z.Allocator {
Expand Down Expand Up @@ -182,15 +184,9 @@ func (st *Stream) produceRanges(ctx context.Context) {

func (st *Stream) newAllocator(threadId int) *z.Allocator {
st.allocatorsMu.Lock()
var a *z.Allocator
if cur, ok := st.allocators[threadId]; ok && cur.Size() == 0 {
a = cur // Reuse.
} else {
// Current allocator has been used already. Create a new one.
a = z.NewAllocator(batchSize)
// a.Tag = fmt.Sprintf("Stream %d: %s", threadId, st.LogPrefix)
st.allocators[threadId] = a
}
a := st.allocPool.Get(batchSize)
a.Tag = "Stream " + st.LogPrefix
st.allocators[threadId] = a
st.allocatorsMu.Unlock()
return a
}
Expand All @@ -206,6 +202,12 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
}
defer txn.Discard()

// produceKVs is running iterate serially. So, we can define the outList here.
outList := new(pb.KVList)
// There would be one last remaining Allocator for this threadId when produceKVs finishes, which
// would be released by Orchestrate.
outList.AllocRef = st.newAllocator(threadId).Ref

iterate := func(kr keyRange) error {
iterOpts := DefaultIteratorOptions
iterOpts.AllVersions = true
Expand All @@ -218,9 +220,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
// This unique stream id is used to identify all the keys from this iteration.
streamId := atomic.AddUint32(&st.nextStreamId, 1)

outList := new(pb.KVList)
outList.AllocRef = st.newAllocator(threadId).Ref

sendIt := func() error {
select {
case st.kvChan <- outList:
Expand Down Expand Up @@ -280,10 +279,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
StreamDone: true,
})
}
if len(outList.Kv) > 0 {
return sendIt()
}
return nil
return sendIt()
}

for {
Expand All @@ -309,6 +305,16 @@ func (st *Stream) streamKVs(ctx context.Context) error {
defer t.Stop()
now := time.Now()

allocs := make(map[uint64]struct{})
returnAllocs := func() {
for ref := range allocs {
a := z.AllocatorFrom(ref)
st.allocPool.Return(a)
delete(allocs, ref)
}
}
defer returnAllocs()

sendBatch := func(batch *pb.KVList) error {
sz := uint64(proto.Size(batch))
bytesSent += sz
Expand All @@ -320,6 +326,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))

returnAllocs()
return nil
}

Expand All @@ -340,6 +347,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
}
y.AssertTrue(kvs != nil)
batch.Kv = append(batch.Kv, kvs.Kv...)
allocs[kvs.AllocRef] = struct{}{}

default:
break loop
Expand Down Expand Up @@ -372,6 +380,7 @@ outer:
}
y.AssertTrue(kvs != nil)
batch = kvs
allocs[kvs.AllocRef] = struct{}{}

// Otherwise, slurp more keys into this batch.
if err := slurp(batch); err != nil {
Expand All @@ -391,6 +400,17 @@ outer:
// are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
// return that error. Orchestrate can be called multiple times, but in serial order.
func (st *Stream) Orchestrate(ctx context.Context) error {
st.allocPool = z.NewAllocatorPool(st.NumGo)
defer func() {
for _, a := range st.allocators {
// Using AllocatorFrom is better because if the allocator is already freed up, it would
// return nil.
a = z.AllocatorFrom(a.Ref)
a.Release()
}
st.allocPool.Release()
}()

st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.

// kvChan should only have a small capacity to ensure that we don't buffer up too much data if
Expand Down Expand Up @@ -439,17 +459,13 @@ func (st *Stream) Orchestrate(ctx context.Context) error {

// Wait for key streaming to be over.
err := <-kvErr

for _, a := range st.allocators {
a.Release()
}
return err
}

func (db *DB) newStream() *Stream {
return &Stream{
db: db,
NumGo: 16,
NumGo: 8,
LogPrefix: "Badger.Stream",
allocators: make(map[int]*z.Allocator),
}
Expand Down
52 changes: 52 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"testing"

"github.com/dgraph-io/badger/v2/pb"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -265,3 +266,54 @@ func TestBigStream(t *testing.T) {
}
require.NoError(t, db.Close())
}

// There was a bug in the stream writer code which would cause allocators to be
// freed up twice if the default keyToList was not used. This test verifies that issue.
func TestStreamCustomKeyToList(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
for _, key := range []string{"p0", "p1", "p2"} {
for i := 1; i <= 100; i++ {
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key), value(i))))
count++
require.NoError(t, txn.CommitAt(uint64(i), nil))
}
}

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
item := itr.Item()
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
kv := &pb.KV{
Key: y.Copy(item.Key()),
Value: val,
}
return &pb.KVList{
Kv: []*pb.KV{kv},
}, nil
}
res := map[string]struct{}{"p0": {}, "p1": {}, "p2": {}}
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
key := string(kv.Key)
if _, ok := res[key]; !ok {
panic(fmt.Sprintf("%s key not found", key))
}
delete(res, key)
}
return nil
}
require.NoError(t, stream.Orchestrate(ctxb))
require.Zero(t, len(res))
}
5 changes: 3 additions & 2 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (sw *StreamWriter) Cancel() {
type sortedWriter struct {
db *DB
throttle *y.Throttle
opts table.Options

builder *table.Builder
lastKey []byte
Expand All @@ -286,6 +287,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
}
w := &sortedWriter{
db: sw.db,
opts: bopts,
streamID: streamID,
throttle: sw.throttle,
builder: table.NewTableBuilder(bopts),
Expand Down Expand Up @@ -379,8 +381,7 @@ func (w *sortedWriter) send(done bool) error {
return nil
}

bopts := buildTableOptions(w.db)
w.builder = table.NewTableBuilder(bopts)
w.builder = table.NewTableBuilder(w.opts)
return nil
}

Expand Down
Loading