Skip to content

Commit

Permalink
Bug Fix: Fix up how we use z.Allocator
Browse files Browse the repository at this point in the history
Do not use AllocatorPool, because that has shown weird crashes due to
Go's interpretation of slices. The new system uses Go memory for
z.Allocator and avoids reusing it.
  • Loading branch information
manishrjain committed Nov 23, 2020
1 parent 5a96b2c commit 5ff9e1d
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 18 deletions.
4 changes: 0 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type DB struct {
registry *KeyRegistry
blockCache *ristretto.Cache
indexCache *ristretto.Cache
allocPool *z.AllocatorPool
}

const (
Expand Down Expand Up @@ -219,7 +218,6 @@ func Open(opt Options) (*DB, error) {
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
Expand Down Expand Up @@ -478,8 +476,6 @@ func (db *DB) IsClosed() bool {
}

func (db *DB) close() (err error) {
defer db.allocPool.Release()

db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs))

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201118204411-eeefcb8bb4ef
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201118204411-eeefcb8bb4ef h1:+Q2Ow3+Ut07Q1LMDkIvzgyxk17o4u8by1JT7hTb+T1Y=
github.com/dgraph-io/ristretto v0.0.4-0.20201118204411-eeefcb8bb4ef/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695 h1:UP7ZrWkI7Qnp0T2ejWq7HGJOfiTIUfLE58Jg862a1eQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
1 change: 0 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func buildTableOptions(db *DB) table.Options {
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
AllocPool: db.allocPool,
DataKey: dk,
}
}
Expand Down
8 changes: 2 additions & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ type Stream struct {
// Use allocators to generate KVs.
allocatorsMu sync.RWMutex
allocators map[int]*z.Allocator

allocPool *z.AllocatorPool
}

func (st *Stream) Allocator(threadId int) *z.Allocator {
Expand Down Expand Up @@ -184,7 +182,7 @@ func (st *Stream) produceRanges(ctx context.Context) {

func (st *Stream) newAllocator(threadId int) *z.Allocator {
st.allocatorsMu.Lock()
a := st.allocPool.Get(batchSize)
a := z.NewAllocator(batchSize)
a.Tag = "Stream " + st.LogPrefix
st.allocators[threadId] = a
st.allocatorsMu.Unlock()
Expand Down Expand Up @@ -309,7 +307,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
returnAllocs := func() {
for ref := range allocs {
a := z.AllocatorFrom(ref)
st.allocPool.Return(a)
a.Release()
delete(allocs, ref)
}
}
Expand Down Expand Up @@ -400,15 +398,13 @@ outer:
// are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
// return that error. Orchestrate can be called multiple times, but in serial order.
func (st *Stream) Orchestrate(ctx context.Context) error {
st.allocPool = z.NewAllocatorPool(st.NumGo)
defer func() {
for _, a := range st.allocators {
// Using AllocatorFrom is better because if the allocator is already freed up, it would
// return nil.
a = z.AllocatorFrom(a.Ref)
a.Release()
}
st.allocPool.Release()
}()

st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.
Expand Down
4 changes: 2 additions & 2 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewTableBuilder(opts Options) *Builder {
sz = maxAllocatorInitialSz
}
b := &Builder{
alloc: opts.AllocPool.Get(sz),
alloc: z.NewAllocator(sz),
opts: &opts,
}
b.alloc.Tag = "Builder"
Expand Down Expand Up @@ -186,7 +186,7 @@ func (b *Builder) handleBlock() {

// Close closes the TableBuilder.
func (b *Builder) Close() {
b.opts.AllocPool.Return(b.alloc)
b.alloc.Release()
}

// Empty returns whether it's empty.
Expand Down
2 changes: 0 additions & 2 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ type Options struct {
BlockCache *ristretto.Cache
IndexCache *ristretto.Cache

AllocPool *z.AllocatorPool

// ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
ZSTDCompressionLevel int
}
Expand Down

0 comments on commit 5ff9e1d

Please sign in to comment.