Skip to content

Commit

Permalink
test(wal): add test for stream-writer with jemalloc enabled and disca…
Browse files Browse the repository at this point in the history
…rd stats (#1557)
  • Loading branch information
NamanJain8 authored Oct 15, 2020
1 parent 9c48993 commit 34f2d04
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 3 deletions.
106 changes: 106 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"testing"
"time"

"github.com/dgraph-io/badger/v2/options"

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
Expand Down Expand Up @@ -904,3 +906,107 @@ func TestTxnReadTs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, int(db.orc.readTs()))
}

// This tests failed for stream writer with jemalloc and compression enabled.
func TestKeyCount(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}

writeSorted := func(db *DB, num uint64) {
valSz := 128
value := make([]byte, valSz)
y.Check2(rand.Read(value))
es := 8 + valSz // key size is 8 bytes and value size is valSz

writer := db.NewStreamWriter()
require.NoError(t, writer.Prepare())

wg := &sync.WaitGroup{}
writeCh := make(chan *pb.KVList, 3)
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvs := &pb.KVList{}
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
Key: key,
Value: value,
Version: 1,
StreamId: streamId,
})

sz += es

if sz >= 4<<20 { // 4 MB
writeCh <- kvs
kvs = &pb.KVList{}
sz = 0
}
}
writeCh <- kvs
}

// Let's create some streams.
width := num / 16
streamID := uint32(0)
for start := uint64(0); start < num; start += width {
end := start + width
if end > num {
end = num
}
streamID++
wg.Add(1)
go writeRange(start, end, streamID)
}
go func() {
wg.Wait()
close(writeCh)
}()
for kvs := range writeCh {
require.NoError(t, writer.Write(kvs))
}
require.NoError(t, writer.Flush())
}

N := uint64(10 * 1e6) // 10 million entries
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opt := DefaultOptions(dir).
WithBlockCacheSize(100 << 20).
WithCompression(options.ZSTD)

db, err := Open(opt)
y.Check(err)
defer db.Close()
writeSorted(db, N)
require.NoError(t, db.Close())

// Read the db
db2, err := Open(DefaultOptions(dir))
y.Check(err)
defer db.Close()
lastKey := -1
count := 0
db2.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true
it := txn.NewIterator(iopt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
count++
i := it.Item()
key := binary.BigEndian.Uint64(i.Key())
// The following should happen as we're writing sorted data.
require.Equalf(t, lastKey+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
lastKey = int(key)
}
return nil
})
require.Equal(t, N, uint64(count))
}
1 change: 0 additions & 1 deletion discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const discardFname string = "DISCARD"
const discardFsize int = 1 << 30
const maxSlot int = 64 << 20

// TODO(naman): Add a test for this.
func initDiscardStats(opt Options) (*discardStats, error) {
fname := path.Join(opt.ValueDir, discardFname)

Expand Down
27 changes: 27 additions & 0 deletions discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func TestDiscardStats(t *testing.T) {
opt := DefaultOptions(dir)
ds, err := initDiscardStats(opt)
require.NoError(t, err)
require.Zero(t, ds.nextEmptySlot)
fid, _ := ds.MaxDiscard()
require.Zero(t, fid)

for i := uint32(0); i < 20; i++ {
require.Equal(t, int64(i*100), ds.Update(i, int64(i*100)))
}
Expand All @@ -48,3 +52,26 @@ func TestDiscardStats(t *testing.T) {
require.Equal(t, int(id*100), int(val))
})
}

func TestReloadDiscardStats(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

opt := DefaultOptions(dir)
db, err := Open(opt)
require.NoError(t, err)
ds := db.vlog.discardStats

ds.Update(uint32(1), 1)
ds.Update(uint32(2), 1)
ds.Update(uint32(1), -1)
require.NoError(t, db.Close())

// Reopen the DB, discard stats should be same.
db2, err := Open(opt)
require.NoError(t, err)
ds2 := db2.vlog.discardStats
require.Zero(t, ds2.Update(uint32(1), 0))
require.Equal(t, 1, int(ds2.Update(uint32(2), 0)))
}
27 changes: 27 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ if [[ ! -z "$TEAMCITY_VERSION" ]]; then
export GOFLAGS="-json"
fi

function InstallJemalloc() {
pushd .
if [ ! -f /usr/local/lib/libjemalloc.a ]; then
USER_ID=`id -u`
JEMALLOC_URL="https://github.com/jemalloc/jemalloc/releases/download/5.2.1/jemalloc-5.2.1.tar.bz2"

mkdir -p /tmp/jemalloc-temp && cd /tmp/jemalloc-temp ;
echo "Downloading jemalloc" ;
curl -s -L ${JEMALLOC_URL} -o jemalloc.tar.bz2 ;
tar xjf ./jemalloc.tar.bz2 ;
cd jemalloc-5.2.1 ;
./configure --with-jemalloc-prefix='je_' ;
make ;
if [ "$USER_ID" -eq "0" ]; then
make install ;
else
echo "==== Need sudo access to install jemalloc" ;
sudo make install ;
fi
fi
popd
}

# Ensure that we can compile the binary.
pushd badger
go build -v .
Expand All @@ -30,6 +53,10 @@ go test -v $tags -run='TestTruncateVlogNoClose2$' --manual=true
go test -v $tags -run='TestTruncateVlogNoClose3$' --manual=true
rm -rf p

InstallJemalloc
# Run the key count test for stream writer.
go test -v -tags jemalloc -run='TestKeyCount' --manual=true

# Run the normal tests.
echo "==> Starting tests.. "
# go test -timeout=25m -v -race github.com/dgraph-io/badger/v2/...
Expand Down
2 changes: 0 additions & 2 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,6 @@ func (vlog *valueLog) write(reqs []*request) error {
if err != nil {
return err
}
// TODO(Naman): Add a test to check that meta bits are correctly set?
// Restore the meta.
e.meta = tmpMeta

Expand Down Expand Up @@ -973,7 +972,6 @@ func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
defer vlog.filesLock.RUnlock()

LOOP:
// TODO(naman): Add a test for MaxDiscard which checks for the zero value.
// Pick a candidate that contains the largest amount of discardable data
fid, discard := vlog.discardStats.MaxDiscard()

Expand Down
57 changes: 57 additions & 0 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,3 +1150,60 @@ func TestValidateWrite(t *testing.T) {
err = log.validateWrites([]*request{req1, req})
require.Error(t, err)
}

func TestValueLogMeta(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
y.Check(err)
defer removeDir(dir)

opt := getTestOptions(dir).WithValueThreshold(16)
db, _ := Open(opt)
defer db.Close()
txn := db.NewTransaction(true)
for i := 0; i < 10; i++ {
k := []byte(fmt.Sprintf("key=%d", i))
v := []byte(fmt.Sprintf("val=%020d", i))
require.NoError(t, txn.SetEntry(NewEntry(k, v)))
}
require.NoError(t, txn.Commit())
fids := db.vlog.sortedFids()
require.Equal(t, 1, len(fids))

// vlog entries must not have txn meta.
db.vlog.filesMap[fids[0]].iterate(true, 0, func(e Entry, vp valuePointer) error {
require.Zero(t, e.meta&(bitTxn|bitFinTxn))
return nil
})

// Entries in LSM tree must have txn bit of meta set
txn = db.NewTransaction(false)
defer txn.Discard()
iopt := DefaultIteratorOptions
key := []byte("key")
iopt.Prefix = key
itr := txn.NewIterator(iopt)
defer itr.Close()
var count int
for itr.Seek(key); itr.ValidForPrefix(key); itr.Next() {
item := itr.Item()
require.Equal(t, bitTxn, item.meta&(bitTxn|bitFinTxn))
count++
}
require.Equal(t, 10, count)
}

// This tests asserts the condition that vlog fids start from 1.
// TODO(naman): should this be changed to assert instead?
func TestFirstVlogFile(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

opt := DefaultOptions(dir)
db, err := Open(opt)
defer db.Close()

fids := db.vlog.sortedFids()
require.NotZero(t, len(fids))
require.Equal(t, uint32(1), fids[0])
}

0 comments on commit 34f2d04

Please sign in to comment.