Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt(stream): Use z.Buffer to stream data #1606

Merged
merged 19 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/protobuf/proto"
)

Expand Down Expand Up @@ -115,7 +116,11 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {
}

var maxVersion uint64
stream.Send = func(list *pb.KVList) error {
stream.Send = func(buf *z.Buffer) error {
list, err := BufferToKVList(buf)
if err != nil {
return err
}
out := list.Kv[:0]
for _, kv := range list.Kv {
if maxVersion < kv.Version {
Expand Down
15 changes: 9 additions & 6 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -359,7 +360,7 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumVersionsToKeep(int(math.MaxInt32)).
WithBlockCacheSize(1 << 30).
WithIndexCacheSize(1 << 30)

if verbose {
opts = opts.WithLoggingLevel(badger.DEBUG)
}
Expand Down Expand Up @@ -498,13 +499,15 @@ func runTest(cmd *cobra.Command, args []string) error {
batch := tmpDb.NewWriteBatch()

stream := db.NewStream()
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
if err := batch.Set(kv.Key, kv.Value); err != nil {
stream.Send = func(buf *z.Buffer) error {
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
}
return nil
return batch.Set(kv.Key, kv.Value)
})
return err
}
y.Check(stream.Orchestrate(context.Background()))
y.Check(batch.Flush())
Expand Down
22 changes: 16 additions & 6 deletions badger/cmd/read_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"github.com/spf13/cobra"

"github.com/dgraph-io/badger/v2"
Expand Down Expand Up @@ -79,7 +80,7 @@ func init() {

// Scan the whole database using the iterators
func fullScanDB(db *badger.DB) {
txn := db.NewTransaction(false)
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

startTime = time.Now()
Expand Down Expand Up @@ -111,7 +112,7 @@ func readBench(cmd *cobra.Command, args []string) error {
WithBlockCacheSize(blockCacheSize << 20).
WithIndexCacheSize(indexCacheSize << 20)
fmt.Printf("Opening badger with options = %+v\n", opt)
db, err := badger.Open(opt)
db, err := badger.OpenManaged(opt)
if err != nil {
return y.Wrapf(err, "unable to open DB")
}
Expand Down Expand Up @@ -205,21 +206,30 @@ func getSampleKeys(db *badger.DB) ([][]byte, error) {
return l, nil
}

errStop := errors.Errorf("Stop iterating")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream.Send = func(l *pb.KVList) error {
stream.Send = func(buf *z.Buffer) error {
if count >= sampleSize {
return nil
}
for _, kv := range l.Kv {
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
keys = append(keys, kv.Key)
count++
if count >= sampleSize {
cancel()
return nil
return errStop
}
return nil
})
if err == errStop || err == nil {
return nil
}
return nil
return err
}

if err := stream.Orchestrate(ctx); err != nil && err != context.Canceled {
Expand Down
19 changes: 11 additions & 8 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,37 +204,38 @@ func writeSorted(db *badger.DB, num uint64) error {
}

wg := &sync.WaitGroup{}
writeCh := make(chan *pb.KVList, 3)
writeCh := make(chan *z.Buffer, 3)
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvs := &pb.KVList{}
kvBuf := z.NewBuffer(5 << 20)
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
kv := &pb.KV{
Key: key,
Value: value,
Version: 1,
StreamId: streamId,
})
}
badger.KVToBuffer(kv, kvBuf)

sz += es
atomic.AddUint64(&entriesWritten, 1)
atomic.AddUint64(&sizeWritten, uint64(es))

if sz >= 4<<20 { // 4 MB
writeCh <- kvs
kvs = &pb.KVList{}
writeCh <- kvBuf
kvBuf = z.NewBuffer(1 << 20)
sz = 0
}
}
writeCh <- kvs
writeCh <- kvBuf
}

// Let's create some streams.
width := num / 16
width := num / 4
streamID := uint32(0)
for start := uint64(0); start < num; start += width {
end := start + width
Expand All @@ -254,6 +255,7 @@ func writeSorted(db *badger.DB, num uint64) error {
if err := writer.Write(kvs); err != nil {
panic(err)
}
y.Check(kvs.Release())
}
log.Println("DONE streaming. Flushing...")
return writer.Flush()
Expand Down Expand Up @@ -395,6 +397,7 @@ func reportStats(c *z.Closer, db *badger.DB) {
y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))

if count%10 == 0 {
fmt.Printf(db.LevelsToString())
}
Expand Down
35 changes: 27 additions & 8 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -95,17 +96,35 @@ func (wb *WriteBatch) callback(err error) {
wb.err.Store(err)
}

func (wb *WriteBatch) Write(kvList *pb.KVList) error {
func (wb *WriteBatch) writeKV(kv *pb.KV) error {
e := Entry{Key: kv.Key, Value: kv.Value}
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]
}
y.AssertTrue(kv.Version != 0)
e.version = kv.Version
return wb.handleEntry(&e)
}

func (wb *WriteBatch) Write(buf *z.Buffer) error {
wb.Lock()
defer wb.Unlock()
for _, kv := range kvList.Kv {
e := Entry{Key: kv.Key, Value: kv.Value}
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]

err := buf.SliceIterate(func(s []byte) error {
kv := &pb.KV{}
if err := kv.Unmarshal(s); err != nil {
return err
}
y.AssertTrue(kv.Version != 0)
e.version = kv.Version
if err := wb.handleEntry(&e); err != nil {
return wb.writeKV(kv)
})
return err
}

func (wb *WriteBatch) WriteList(kvList *pb.KVList) error {
wb.Lock()
defer wb.Unlock()
for _, kv := range kvList.Kv {
if err := wb.writeKV(kv); err != nil {
return err
}
}
Expand Down
8 changes: 6 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type DB struct {
registry *KeyRegistry
blockCache *ristretto.Cache
indexCache *ristretto.Cache
allocPool *z.AllocatorPool
}

const (
Expand Down Expand Up @@ -218,6 +219,7 @@ func Open(opt Options) (*DB, error) {
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
Expand Down Expand Up @@ -476,6 +478,8 @@ func (db *DB) IsClosed() bool {
}

func (db *DB) close() (err error) {
defer db.allocPool.Release()

db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(atomic.LoadInt64(&db.lc.l0stallsMs)))

Expand Down Expand Up @@ -1779,8 +1783,8 @@ func (db *DB) StreamDB(outOptions Options) error {
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)

stream.Send = func(kvs *pb.KVList) error {
return writer.Write(kvs)
stream.Send = func(buf *z.Buffer) error {
return writer.Write(buf)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
Expand Down
22 changes: 19 additions & 3 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,20 @@ func TestKeyCount(t *testing.T) {
wg.Wait()
close(writeCh)
}()

write := func(kvs *pb.KVList) error {
buf := z.NewBuffer(1 << 20)
defer buf.Release()

for _, kv := range kvs.Kv {
KVToBuffer(kv, buf)
}
writer.Write(buf)
return nil
}

for kvs := range writeCh {
require.NoError(t, writer.Write(kvs))
require.NoError(t, write(kvs))
}
require.NoError(t, writer.Flush())
}
Expand Down Expand Up @@ -999,8 +1011,11 @@ func TestKeyCount(t *testing.T) {

streams := make(map[uint32]int)
stream := db2.NewStream()
stream.Send = func(list *pb.KVList) error {
count += len(list.Kv)
stream.Send = func(buf *z.Buffer) error {
list, err := BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.Kv {
last := streams[kv.StreamId]
key := binary.BigEndian.Uint64(kv.Key)
Expand All @@ -1010,6 +1025,7 @@ func TestKeyCount(t *testing.T) {
}
streams[kv.StreamId] = int(key)
}
count += len(list.Kv)
return nil
}
require.NoError(t, stream.Orchestrate(context.Background()))
Expand Down
11 changes: 7 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

// summary is produced when DB is closed. Currently it is used only for testing.
Expand Down Expand Up @@ -2088,25 +2089,27 @@ func TestVerifyChecksum(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
value := make([]byte, 32)
y.Check2(rand.Read(value))
l := &pb.KVList{}
st := 0

buf := z.NewBuffer(10 << 20)
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(i))
l.Kv = append(l.Kv, &pb.KV{
KVToBuffer(&pb.KV{
Key: key,
Value: value,
StreamId: uint32(st),
Version: 1,
})
}, buf)
if i%100 == 0 {
st++
}
}

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(l), "sw.Write() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

require.NoError(t, db.VerifyChecksum(), "checksum verification failed for DB")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695
github.com/dgraph-io/ristretto v0.0.4-0.20201125174811-766bca5e9938
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695 h1:UP7ZrWkI7Qnp0T2ejWq7HGJOfiTIUfLE58Jg862a1eQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201123185045-68b18eb1b695/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201125174811-766bca5e9938 h1:FdSJif9oUVeH+MpsScsrL6OAbdW0pUYvXmkdhDSWWcQ=
github.com/dgraph-io/ristretto v0.0.4-0.20201125174811-766bca5e9938/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
1 change: 1 addition & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func buildTableOptions(db *DB) table.Options {
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
AllocPool: db.allocPool,
DataKey: dk,
}
}
Expand Down
Loading