From 34f2d04d8a500c2f2ab67f4f999cfe51f6e7f16b Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Thu, 15 Oct 2020 14:54:37 +0530 Subject: [PATCH] test(wal): add test for stream-writer with jemalloc enabled and discard stats (#1557) --- db2_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++++++++ discard.go | 1 - discard_test.go | 27 ++++++++++++ test.sh | 27 ++++++++++++ value.go | 2 - value_test.go | 57 ++++++++++++++++++++++++++ 6 files changed, 217 insertions(+), 3 deletions(-) diff --git a/db2_test.go b/db2_test.go index afecd1b3e..ebf7b3751 100644 --- a/db2_test.go +++ b/db2_test.go @@ -33,6 +33,8 @@ import ( "testing" "time" + "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/table" "github.com/dgraph-io/badger/v2/y" @@ -904,3 +906,107 @@ func TestTxnReadTs(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, int(db.orc.readTs())) } + +// This tests failed for stream writer with jemalloc and compression enabled. +func TestKeyCount(t *testing.T) { + if !*manual { + t.Skip("Skipping test meant to be run manually.") + return + } + + writeSorted := func(db *DB, num uint64) { + valSz := 128 + value := make([]byte, valSz) + y.Check2(rand.Read(value)) + es := 8 + valSz // key size is 8 bytes and value size is valSz + + writer := db.NewStreamWriter() + require.NoError(t, writer.Prepare()) + + wg := &sync.WaitGroup{} + writeCh := make(chan *pb.KVList, 3) + writeRange := func(start, end uint64, streamId uint32) { + // end is not included. + defer wg.Done() + kvs := &pb.KVList{} + var sz int + for i := start; i < end; i++ { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, i) + kvs.Kv = append(kvs.Kv, &pb.KV{ + Key: key, + Value: value, + Version: 1, + StreamId: streamId, + }) + + sz += es + + if sz >= 4<<20 { // 4 MB + writeCh <- kvs + kvs = &pb.KVList{} + sz = 0 + } + } + writeCh <- kvs + } + + // Let's create some streams. + width := num / 16 + streamID := uint32(0) + for start := uint64(0); start < num; start += width { + end := start + width + if end > num { + end = num + } + streamID++ + wg.Add(1) + go writeRange(start, end, streamID) + } + go func() { + wg.Wait() + close(writeCh) + }() + for kvs := range writeCh { + require.NoError(t, writer.Write(kvs)) + } + require.NoError(t, writer.Flush()) + } + + N := uint64(10 * 1e6) // 10 million entries + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + opt := DefaultOptions(dir). + WithBlockCacheSize(100 << 20). + WithCompression(options.ZSTD) + + db, err := Open(opt) + y.Check(err) + defer db.Close() + writeSorted(db, N) + require.NoError(t, db.Close()) + + // Read the db + db2, err := Open(DefaultOptions(dir)) + y.Check(err) + defer db.Close() + lastKey := -1 + count := 0 + db2.View(func(txn *Txn) error { + iopt := DefaultIteratorOptions + iopt.AllVersions = true + it := txn.NewIterator(iopt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + count++ + i := it.Item() + key := binary.BigEndian.Uint64(i.Key()) + // The following should happen as we're writing sorted data. + require.Equalf(t, lastKey+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key)) + lastKey = int(key) + } + return nil + }) + require.Equal(t, N, uint64(count)) +} diff --git a/discard.go b/discard.go index 7b2d3d55f..ba15191fd 100644 --- a/discard.go +++ b/discard.go @@ -41,7 +41,6 @@ const discardFname string = "DISCARD" const discardFsize int = 1 << 30 const maxSlot int = 64 << 20 -// TODO(naman): Add a test for this. func initDiscardStats(opt Options) (*discardStats, error) { fname := path.Join(opt.ValueDir, discardFname) diff --git a/discard_test.go b/discard_test.go index 351273fe1..28c70a2df 100644 --- a/discard_test.go +++ b/discard_test.go @@ -31,6 +31,10 @@ func TestDiscardStats(t *testing.T) { opt := DefaultOptions(dir) ds, err := initDiscardStats(opt) require.NoError(t, err) + require.Zero(t, ds.nextEmptySlot) + fid, _ := ds.MaxDiscard() + require.Zero(t, fid) + for i := uint32(0); i < 20; i++ { require.Equal(t, int64(i*100), ds.Update(i, int64(i*100))) } @@ -48,3 +52,26 @@ func TestDiscardStats(t *testing.T) { require.Equal(t, int(id*100), int(val)) }) } + +func TestReloadDiscardStats(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + opt := DefaultOptions(dir) + db, err := Open(opt) + require.NoError(t, err) + ds := db.vlog.discardStats + + ds.Update(uint32(1), 1) + ds.Update(uint32(2), 1) + ds.Update(uint32(1), -1) + require.NoError(t, db.Close()) + + // Reopen the DB, discard stats should be same. + db2, err := Open(opt) + require.NoError(t, err) + ds2 := db2.vlog.discardStats + require.Zero(t, ds2.Update(uint32(1), 0)) + require.Equal(t, 1, int(ds2.Update(uint32(2), 0))) +} diff --git a/test.sh b/test.sh index fdf899e48..f7ea5606d 100755 --- a/test.sh +++ b/test.sh @@ -10,6 +10,29 @@ if [[ ! -z "$TEAMCITY_VERSION" ]]; then export GOFLAGS="-json" fi +function InstallJemalloc() { + pushd . + if [ ! -f /usr/local/lib/libjemalloc.a ]; then + USER_ID=`id -u` + JEMALLOC_URL="https://github.com/jemalloc/jemalloc/releases/download/5.2.1/jemalloc-5.2.1.tar.bz2" + + mkdir -p /tmp/jemalloc-temp && cd /tmp/jemalloc-temp ; + echo "Downloading jemalloc" ; + curl -s -L ${JEMALLOC_URL} -o jemalloc.tar.bz2 ; + tar xjf ./jemalloc.tar.bz2 ; + cd jemalloc-5.2.1 ; + ./configure --with-jemalloc-prefix='je_' ; + make ; + if [ "$USER_ID" -eq "0" ]; then + make install ; + else + echo "==== Need sudo access to install jemalloc" ; + sudo make install ; + fi + fi + popd +} + # Ensure that we can compile the binary. pushd badger go build -v . @@ -30,6 +53,10 @@ go test -v $tags -run='TestTruncateVlogNoClose2$' --manual=true go test -v $tags -run='TestTruncateVlogNoClose3$' --manual=true rm -rf p +InstallJemalloc +# Run the key count test for stream writer. +go test -v -tags jemalloc -run='TestKeyCount' --manual=true + # Run the normal tests. echo "==> Starting tests.. " # go test -timeout=25m -v -race github.com/dgraph-io/badger/v2/... diff --git a/value.go b/value.go index ddedc111d..8b62db192 100644 --- a/value.go +++ b/value.go @@ -853,7 +853,6 @@ func (vlog *valueLog) write(reqs []*request) error { if err != nil { return err } - // TODO(Naman): Add a test to check that meta bits are correctly set? // Restore the meta. e.meta = tmpMeta @@ -973,7 +972,6 @@ func (vlog *valueLog) pickLog(discardRatio float64) *logFile { defer vlog.filesLock.RUnlock() LOOP: - // TODO(naman): Add a test for MaxDiscard which checks for the zero value. // Pick a candidate that contains the largest amount of discardable data fid, discard := vlog.discardStats.MaxDiscard() diff --git a/value_test.go b/value_test.go index c351da41d..6ba065d61 100644 --- a/value_test.go +++ b/value_test.go @@ -1150,3 +1150,60 @@ func TestValidateWrite(t *testing.T) { err = log.validateWrites([]*request{req1, req}) require.Error(t, err) } + +func TestValueLogMeta(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + y.Check(err) + defer removeDir(dir) + + opt := getTestOptions(dir).WithValueThreshold(16) + db, _ := Open(opt) + defer db.Close() + txn := db.NewTransaction(true) + for i := 0; i < 10; i++ { + k := []byte(fmt.Sprintf("key=%d", i)) + v := []byte(fmt.Sprintf("val=%020d", i)) + require.NoError(t, txn.SetEntry(NewEntry(k, v))) + } + require.NoError(t, txn.Commit()) + fids := db.vlog.sortedFids() + require.Equal(t, 1, len(fids)) + + // vlog entries must not have txn meta. + db.vlog.filesMap[fids[0]].iterate(true, 0, func(e Entry, vp valuePointer) error { + require.Zero(t, e.meta&(bitTxn|bitFinTxn)) + return nil + }) + + // Entries in LSM tree must have txn bit of meta set + txn = db.NewTransaction(false) + defer txn.Discard() + iopt := DefaultIteratorOptions + key := []byte("key") + iopt.Prefix = key + itr := txn.NewIterator(iopt) + defer itr.Close() + var count int + for itr.Seek(key); itr.ValidForPrefix(key); itr.Next() { + item := itr.Item() + require.Equal(t, bitTxn, item.meta&(bitTxn|bitFinTxn)) + count++ + } + require.Equal(t, 10, count) +} + +// This tests asserts the condition that vlog fids start from 1. +// TODO(naman): should this be changed to assert instead? +func TestFirstVlogFile(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + opt := DefaultOptions(dir) + db, err := Open(opt) + defer db.Close() + + fids := db.vlog.sortedFids() + require.NotZero(t, len(fids)) + require.Equal(t, uint32(1), fids[0]) +}