From 5ff9e1dd1b89230b137154bd80ea03925710d7ab Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 23 Nov 2020 11:14:37 -0800 Subject: [PATCH] Bug Fix: Fix up how we use z.Allocator Do not use AllocatorPool, because that has shown weird crashes due to Go's interpretation of slices. The new system uses Go memory for z.Allocator and avoids reusing it. --- db.go | 4 ---- go.mod | 2 +- go.sum | 4 ++-- options.go | 1 - stream.go | 8 ++------ table/builder.go | 4 ++-- table/table.go | 2 -- 7 files changed, 7 insertions(+), 18 deletions(-) diff --git a/db.go b/db.go index befec1957..eb8a3b85b 100644 --- a/db.go +++ b/db.go @@ -95,7 +95,6 @@ type DB struct { registry *KeyRegistry blockCache *ristretto.Cache indexCache *ristretto.Cache - allocPool *z.AllocatorPool } const ( @@ -219,7 +218,6 @@ func Open(opt Options) (*DB, error) { valueDirGuard: valueDirLockGuard, orc: newOracle(opt), pub: newPublisher(), - allocPool: z.NewAllocatorPool(8), } // Cleanup all the goroutines started by badger in case of an error. defer func() { @@ -478,8 +476,6 @@ func (db *DB) IsClosed() bool { } func (db *DB) close() (err error) { - defer db.allocPool.Release() - db.opt.Debugf("Closing database") db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs)) diff --git a/go.mod b/go.mod index 6613c798c..0c43ba3ba 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20201118204411-eeefcb8bb4ef + github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index 325f83d4a..7b01bad1f 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20201118204411-eeefcb8bb4ef h1:+Q2Ow3+Ut07Q1LMDkIvzgyxk17o4u8by1JT7hTb+T1Y= -github.com/dgraph-io/ristretto v0.0.4-0.20201118204411-eeefcb8bb4ef/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ= +github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695 h1:UP7ZrWkI7Qnp0T2ejWq7HGJOfiTIUfLE58Jg862a1eQ= +github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/options.go b/options.go index ede01d2e4..33e2c4865 100644 --- a/options.go +++ b/options.go @@ -177,7 +177,6 @@ func buildTableOptions(db *DB) table.Options { ZSTDCompressionLevel: opt.ZSTDCompressionLevel, BlockCache: db.blockCache, IndexCache: db.indexCache, - AllocPool: db.allocPool, DataKey: dk, } } diff --git a/stream.go b/stream.go index 972226e4c..63e949166 100644 --- a/stream.go +++ b/stream.go @@ -92,8 +92,6 @@ type Stream struct { // Use allocators to generate KVs. allocatorsMu sync.RWMutex allocators map[int]*z.Allocator - - allocPool *z.AllocatorPool } func (st *Stream) Allocator(threadId int) *z.Allocator { @@ -184,7 +182,7 @@ func (st *Stream) produceRanges(ctx context.Context) { func (st *Stream) newAllocator(threadId int) *z.Allocator { st.allocatorsMu.Lock() - a := st.allocPool.Get(batchSize) + a := z.NewAllocator(batchSize) a.Tag = "Stream " + st.LogPrefix st.allocators[threadId] = a st.allocatorsMu.Unlock() @@ -309,7 +307,7 @@ func (st *Stream) streamKVs(ctx context.Context) error { returnAllocs := func() { for ref := range allocs { a := z.AllocatorFrom(ref) - st.allocPool.Return(a) + a.Release() delete(allocs, ref) } } @@ -400,7 +398,6 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { - st.allocPool = z.NewAllocatorPool(st.NumGo) defer func() { for _, a := range st.allocators { // Using AllocatorFrom is better because if the allocator is already freed up, it would @@ -408,7 +405,6 @@ func (st *Stream) Orchestrate(ctx context.Context) error { a = z.AllocatorFrom(a.Ref) a.Release() } - st.allocPool.Release() }() st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. diff --git a/table/builder.go b/table/builder.go index 5f400385f..39bb166c8 100644 --- a/table/builder.go +++ b/table/builder.go @@ -127,7 +127,7 @@ func NewTableBuilder(opts Options) *Builder { sz = maxAllocatorInitialSz } b := &Builder{ - alloc: opts.AllocPool.Get(sz), + alloc: z.NewAllocator(sz), opts: &opts, } b.alloc.Tag = "Builder" @@ -186,7 +186,7 @@ func (b *Builder) handleBlock() { // Close closes the TableBuilder. func (b *Builder) Close() { - b.opts.AllocPool.Return(b.alloc) + b.alloc.Release() } // Empty returns whether it's empty. diff --git a/table/table.go b/table/table.go index 0ff60c22b..17cc31f58 100644 --- a/table/table.go +++ b/table/table.go @@ -80,8 +80,6 @@ type Options struct { BlockCache *ristretto.Cache IndexCache *ristretto.Cache - AllocPool *z.AllocatorPool - // ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks. ZSTDCompressionLevel int }