diff --git a/.travis.yml b/.travis.yml index 825bad7ca..5439132bc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,38 +1,26 @@ language: go go: - - "1.11" - - "1.12" - - "1.13" - - tip + - "1.11" + - "1.12" + - "1.13" + - tip os: - - osx -env: - jobs: - - GOARCH=386 - - GOARCH=amd64 - global: - - secure: CRkV2+/jlO0gXzzS50XGxfMS117FNwiVjxNY/LeWq06RKD+dDCPxTJl3JCNe3l0cYEPAglV2uMMYukDiTqJ7e+HI4nh4N4mv6lwx39N8dAvJe1x5ITS2T4qk4kTjuQb1Q1vw/ZOxoQqmvNKj2uRmBdJ/HHmysbRJ1OzCWML3OXdUwJf0AYlJzTjpMfkOKr7sTtE4rwyyQtd4tKH1fGdurgI9ZuFd9qvYxK2qcJhsQ6CNqMXt+7FkVkN1rIPmofjjBTNryzUr4COFXuWH95aDAif19DeBW4lbNgo1+FpDsrgmqtuhl6NAuptI8q/imow2KXBYJ8JPXsxW8DVFj0IIp0RCd3GjaEnwBEbxAyiIHLfW7AudyTS/dJOvZffPqXnuJ8xj3OPIdNe4xY0hWl8Ju2HhKfLOAHq7VadHZWd3IHLil70EiL4/JLD1rNbMImUZisFaA8pyrcIvYYebjOnk4TscwKFLedClRSX1XsMjWWd0oykQtrdkHM2IxknnBpaLu7mFnfE07f6dkG0nlpyu4SCLey7hr5FdcEmljA0nIxTSYDg6035fQkBEAbe7hlESOekkVNT9IZPwG+lmt3vU4ofi6NqNbJecOuSB+h36IiZ9s4YQtxYNnLgW14zjuFGGyT5smc3IjBT7qngDjKIgyrSVoRkY/8udy9qbUgvBeW8= - - + - osx + jobs: allow_failures: - go: tip - exclude: - # Exclude builds for 386 architecture on go 1.11, 1.12 and tip - # Since we don't want it to run for 32 bit - - go: "1.11" - env: GOARCH=386 - - go: "1.12" - env: GOARCH=386 - - go: tip - env: GOARCH=386 notifications: email: false slack: secure: X7uBLWYbuUhf8QFE16CoS5z7WvFR8EN9j6cEectMW6mKZ3vwXGwVXRIPsgUq/606DsQdCCx34MR8MRWYGlu6TBolbSe9y0EP0i46yipPz22YtuT7umcVUbGEyx8MZKgG0v1u/zA0O4aCsOBpGAA3gxz8h3JlEHDt+hv6U8xRsSllVLzLSNb5lwxDtcfEDxVVqP47GMEgjLPM28Pyt5qwjk7o5a4YSVzkfdxBXxd3gWzFUWzJ5E3cTacli50dK4GVfiLcQY2aQYoYO7AAvDnvP+TPfjDkBlUEE4MUz5CDIN51Xb+WW33sX7g+r3Bj7V5IRcF973RiYkpEh+3eoiPnyWyxhDZBYilty3b+Hysp6d4Ov/3I3ll7Bcny5+cYjakjkMH3l9w3gs6Y82GlpSLSJshKWS8vPRsxFe0Pstj6QSJXTd9EBaFr+l1ScXjJv/Sya9j8N9FfTuOTESWuaL1auX4Y7zEEVHlA8SCNOO8K0eTfxGZnC/YcIHsR8rePEAcFxfOYQppkyLF/XvAtnb/LMUuu0g4y2qNdme6Oelvyar1tFEMRtbl4mRCdu/krXBFtkrsfUaVY6WTPdvXAGotsFJ0wuA53zGVhlcd3+xAlSlR3c1QX95HIMeivJKb5L4nTjP+xnrmQNtnVk+tG4LSH2ltuwcZSSczModtcBmRefrk= +env: + global: + - secure: CRkV2+/jlO0gXzzS50XGxfMS117FNwiVjxNY/LeWq06RKD+dDCPxTJl3JCNe3l0cYEPAglV2uMMYukDiTqJ7e+HI4nh4N4mv6lwx39N8dAvJe1x5ITS2T4qk4kTjuQb1Q1vw/ZOxoQqmvNKj2uRmBdJ/HHmysbRJ1OzCWML3OXdUwJf0AYlJzTjpMfkOKr7sTtE4rwyyQtd4tKH1fGdurgI9ZuFd9qvYxK2qcJhsQ6CNqMXt+7FkVkN1rIPmofjjBTNryzUr4COFXuWH95aDAif19DeBW4lbNgo1+FpDsrgmqtuhl6NAuptI8q/imow2KXBYJ8JPXsxW8DVFj0IIp0RCd3GjaEnwBEbxAyiIHLfW7AudyTS/dJOvZffPqXnuJ8xj3OPIdNe4xY0hWl8Ju2HhKfLOAHq7VadHZWd3IHLil70EiL4/JLD1rNbMImUZisFaA8pyrcIvYYebjOnk4TscwKFLedClRSX1XsMjWWd0oykQtrdkHM2IxknnBpaLu7mFnfE07f6dkG0nlpyu4SCLey7hr5FdcEmljA0nIxTSYDg6035fQkBEAbe7hlESOekkVNT9IZPwG+lmt3vU4ofi6NqNbJecOuSB+h36IiZ9s4YQtxYNnLgW14zjuFGGyT5smc3IjBT7qngDjKIgyrSVoRkY/8udy9qbUgvBeW8= + before_script: - go get github.com/mattn/goveralls script: diff --git a/CHANGELOG.md b/CHANGELOG.md index 1115ef506..6408ebcdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,31 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Serialization Versioning](VERSIONING.md). -## [2.0.2] - 2020-02-26 - -### Fixed - -- Cast sz to uint32 to fix compilation on 32 bit. (#1175) -- Fix checkOverlap in compaction. (#1166) -- Avoid sync in inmemory mode. (#1190) -- Support disabling the cache completely. (#1185) -- Add support for caching bloomfilters. (#1204) -- Fix int overflow for 32bit. (#1216) -- Remove the 'this entry should've caught' log from value.go. (#1170) -- Rework concurrency semantics of valueLog.maxFid. (#1187) - -### Performance - -- Use fastRand instead of locked-rand in skiplist. (#1173) -- Improve write stalling on level 0 and 1. (#1186) -- Disable compression and set ZSTD Compression Level to 1. (#1191) - ## [2.0.1] - 2020-01-02 ### New APIs - badger.Options - - WithInMemory (f5b6321) + - WithInMemory (5b6321) - WithZSTDCompressionLevel (3eb4e72) - Badger.TableInfo @@ -293,7 +274,7 @@ Bug fix: ## [1.0.1] - 2017-11-06 * Fix an uint16 overflow when resizing key slice -[2.0.2]: https://github.com/dgraph-io/badger/compare/v2.0.1...v2.0.2 +[Unreleased]: https://github.com/dgraph-io/badger/compare/v2.0.1...HEAD [2.0.1]: https://github.com/dgraph-io/badger/compare/v2.0.0...v2.0.1 [2.0.0]: https://github.com/dgraph-io/badger/compare/v1.6.0...v2.0.0 [1.6.0]: https://github.com/dgraph-io/badger/compare/v1.5.5...v1.6.0 diff --git a/README.md b/README.md index 25fc2a4e9..a3064653c 100644 --- a/README.md +++ b/README.md @@ -252,7 +252,7 @@ on it. ```go err := db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry([]byte("answer"), []byte("42")) + e := NewEntry([]byte("answer"), []byte("42")) err := txn.SetEntry(e) return err }) @@ -401,7 +401,7 @@ and `Txn.SetEntry()` API methods. ```go err := db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) + e := NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) err := txn.SetEntry(e) return err }) @@ -414,7 +414,7 @@ metadata can be set using `Entry.WithMeta()` and `Txn.SetEntry()` API methods. ```go err := db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) + e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) err := txn.SetEntry(e) return err }) @@ -425,7 +425,7 @@ then can be set using `Txn.SetEntry()`. ```go err := db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) + e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) err := txn.SetEntry(e) return err }) @@ -748,7 +748,6 @@ Below is a list of known projects that use Badger: * [0-stor](https://github.com/zero-os/0-stor) - Single device object store. * [Dgraph](https://github.com/dgraph-io/dgraph) - Distributed graph database. -* [Jaeger](https://github.com/jaegertracing/jaeger) - Distributed tracing platform. * [TalariaDB](https://github.com/grab/talaria) - Distributed, low latency time-series database. * [Dispatch Protocol](https://github.com/dispatchlabs/disgo) - Blockchain protocol for distributed application data analytics. * [Sandglass](https://github.com/celrenheit/sandglass) - distributed, horizontally scalable, persistent, time sorted message queue. diff --git a/backup_test.go b/backup_test.go index b652c89e3..16ef8ed5c 100644 --- a/backup_test.go +++ b/backup_test.go @@ -116,8 +116,9 @@ func TestBackupRestore1(t *testing.T) { func TestBackupRestore2(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } defer removeDir(tmpdir) s1Path := filepath.Join(tmpdir, "test1") @@ -125,9 +126,9 @@ func TestBackupRestore2(t *testing.T) { s3Path := filepath.Join(tmpdir, "test3") db1, err := Open(getTestOptions(s1Path)) - require.NoError(t, err) - - defer db1.Close() + if err != nil { + t.Fatal(err) + } key1 := []byte("key1") key2 := []byte("key2") rawValue := []byte("NotLongValue") @@ -138,8 +139,9 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(key2, rawValue)) }) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } for i := byte(1); i < N; i++ { err = db1.Update(func(tx *Txn) error { if err := tx.SetEntry(NewEntry(append(key1, i), rawValue)); err != nil { @@ -147,21 +149,25 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(append(key2, i), rawValue)) }) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } } var backup bytes.Buffer _, err = db1.Backup(&backup, 0) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } fmt.Println("backup1 length:", backup.Len()) db2, err := Open(getTestOptions(s2Path)) - require.NoError(t, err) - - defer db2.Close() + if err != nil { + t.Fatal(err) + } err = db2.Load(&backup, 16) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } for i := byte(1); i < N; i++ { err = db2.View(func(tx *Txn) error { @@ -182,8 +188,9 @@ func TestBackupRestore2(t *testing.T) { } return nil }) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } } for i := byte(1); i < N; i++ { @@ -193,22 +200,26 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(append(key2, i), rawValue)) }) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } } backup.Reset() _, err = db2.Backup(&backup, 0) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } fmt.Println("backup2 length:", backup.Len()) db3, err := Open(getTestOptions(s3Path)) - require.NoError(t, err) - - defer db3.Close() + if err != nil { + t.Fatal(err) + } err = db3.Load(&backup, 16) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } for i := byte(1); i < N; i++ { err = db3.View(func(tx *Txn) error { @@ -229,8 +240,9 @@ func TestBackupRestore2(t *testing.T) { } return nil }) - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } } } @@ -298,8 +310,9 @@ func TestBackup(t *testing.T) { } t.Run("disk mode", func(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } defer removeDir(tmpdir) opt := DefaultOptions(filepath.Join(tmpdir, "backup0")) runBadgerTest(t, &opt, func(t *testing.T, db *DB) { @@ -317,9 +330,11 @@ func TestBackup(t *testing.T) { func TestBackupRestore3(t *testing.T) { var bb bytes.Buffer - tmpdir, err := ioutil.TempDir("", "badger-test") - require.NoError(t, err) + tmpdir, err := ioutil.TempDir("", "badger-test") + if err != nil { + t.Fatal(err) + } defer removeDir(tmpdir) N := 1000 @@ -328,9 +343,10 @@ func TestBackupRestore3(t *testing.T) { // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1"))) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } - defer db1.Close() require.NoError(t, populateEntries(db1, entries)) _, err = db1.Backup(&bb, 0) @@ -342,9 +358,9 @@ func TestBackupRestore3(t *testing.T) { // restore db2, err := Open(DefaultOptions(filepath.Join(tmpdir, "restore1"))) - require.NoError(t, err) - - defer db2.Close() + if err != nil { + t.Fatal(err) + } require.NoError(t, db2.Load(&bb, 16)) // verify @@ -374,8 +390,9 @@ func TestBackupRestore3(t *testing.T) { func TestBackupLoadIncremental(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } defer removeDir(tmpdir) N := 100 @@ -386,9 +403,9 @@ func TestBackupLoadIncremental(t *testing.T) { // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2"))) - require.NoError(t, err) - - defer db1.Close() + if err != nil { + t.Fatal(err) + } require.NoError(t, populateEntries(db1, entries)) since, err := db1.Backup(&bb, 0) @@ -446,10 +463,9 @@ func TestBackupLoadIncremental(t *testing.T) { // restore db2, err := Open(getTestOptions(filepath.Join(tmpdir, "restore2"))) - require.NoError(t, err) - - defer db2.Close() - + if err != nil { + t.Fatal(err) + } require.NoError(t, db2.Load(&bb, 16)) // verify diff --git a/db.go b/db.go index a40aa8d92..4cffe9cea 100644 --- a/db.go +++ b/db.go @@ -146,8 +146,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { ExpiresAt: e.ExpiresAt, } - switch { - case e.meta&bitFinTxn > 0: + if e.meta&bitFinTxn > 0 { txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil { return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value) @@ -161,7 +160,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { txn = txn[:0] lastCommit = 0 - case e.meta&bitTxn > 0: + } else if e.meta&bitTxn > 0 { txnTs := y.ParseTs(nk) if lastCommit == 0 { lastCommit = txnTs @@ -175,7 +174,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { te := txnEntry{nk: nk, v: v} txn = append(txn, te) - default: + } else { // This entry is from a rewrite. toLSM(nk, v) @@ -279,6 +278,17 @@ func Open(opt Options) (db *DB, err error) { elog = trace.NewEventLog("Badger", "DB") } + config := ristretto.Config{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), + MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), + BufferItems: 64, + Metrics: true, + } + cache, err := ristretto.NewCache(&config) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache") + } db = &DB{ imm: make([]*skl.Skiplist, 0, opt.NumMemtables), flushChan: make(chan flushTask, opt.NumMemtables), @@ -290,20 +300,7 @@ func Open(opt Options) (db *DB, err error) { valueDirGuard: valueDirLockGuard, orc: newOracle(opt), pub: newPublisher(), - } - - if opt.MaxCacheSize > 0 { - config := ristretto.Config{ - // Use 5% of cache memory for storing counters. - NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), - MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), - BufferItems: 64, - Metrics: true, - } - db.blockCache, err = ristretto.NewCache(&config) - if err != nil { - return nil, errors.Wrap(err, "failed to create cache") - } + blockCache: cache, } if db.opt.InMemory { @@ -389,10 +386,7 @@ func Open(opt Options) (db *DB, err error) { // CacheMetrics returns the metrics for the underlying cache. func (db *DB) CacheMetrics() *ristretto.Metrics { - if db.blockCache != nil { - return db.blockCache.Metrics - } - return nil + return db.blockCache.Metrics } // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to @@ -1056,10 +1050,9 @@ func (db *DB) calculateSize() { return err } ext := filepath.Ext(path) - switch ext { - case ".sst": + if ext == ".sst" { lsmSize += info.Size() - case ".vlog": + } else if ext == ".vlog" { vlogSize += info.Size() } return nil @@ -1216,12 +1209,11 @@ func (seq *Sequence) Release() error { func (seq *Sequence) updateLease() error { return seq.db.Update(func(txn *Txn) error { item, err := txn.Get(seq.key) - switch { - case err == ErrKeyNotFound: + if err == ErrKeyNotFound { seq.next = 0 - case err != nil: + } else if err != nil { return err - default: + } else { var num uint64 if err := item.Value(func(v []byte) error { num = binary.BigEndian.Uint64(v) @@ -1509,7 +1501,6 @@ func (db *DB) dropAll() (func(), error) { db.lc.nextFileID = 1 db.opt.Infof("Deleted %d value log files. DropAll done.\n", num) db.blockCache.Clear() - return resume, nil } diff --git a/db2_test.go b/db2_test.go index 56d9ca145..3885f9aae 100644 --- a/db2_test.go +++ b/db2_test.go @@ -23,7 +23,6 @@ import ( "fmt" "io/ioutil" "log" - "math" "math/rand" "os" "path" @@ -311,7 +310,7 @@ func TestPushValueLogLimit(t *testing.T) { for i := 0; i < 32; i++ { if i == 4 { - v := make([]byte, math.MaxInt32) + v := make([]byte, 2<<30) err := db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte(key(i)), v)) }) @@ -628,29 +627,22 @@ func TestL0GCBug(t *testing.T) { return []byte(fmt.Sprintf("%10d", i)) } val := []byte{1, 1, 1, 1, 1, 1, 1, 1} - // Insert 100 entries. This will create about 50*3 vlog files and 6 SST files. - for i := 0; i < 3; i++ { - for j := 0; j < 100; j++ { - err = db1.Update(func(txn *Txn) error { - return txn.SetEntry(NewEntry(key(j), val)) - }) - require.NoError(t, err) - } + // Insert 100 entries. This will create about 50 vlog files and 2 SST files. + for i := 0; i < 100; i++ { + err = db1.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key(i), val)) + }) + require.NoError(t, err) } // Run value log GC multiple times. This would ensure at least // one value log file is garbage collected. - success := 0 for i := 0; i < 10; i++ { err := db1.RunValueLogGC(0.01) - if err == nil { - success++ - } if err != nil && err != ErrNoRewrite { t.Fatalf(err.Error()) } } - // Ensure alteast one GC call was successful. - require.NotZero(t, success) + // CheckKeys reads all the keys previously stored. checkKeys := func(db *DB) { for i := 0; i < 100; i++ { @@ -673,12 +665,7 @@ func TestL0GCBug(t *testing.T) { if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) } - for _, f := range db1.vlog.filesMap { - require.NoError(t, f.fd.Close()) - } - require.NoError(t, db1.registry.Close()) - require.NoError(t, db1.lc.close()) - require.NoError(t, db1.manifest.close()) + require.NoError(t, db1.vlog.Close()) db2, err := Open(opts) require.NoError(t, err) @@ -736,6 +723,7 @@ func TestWindowsDataLoss(t *testing.T) { opt.Truncate = true db, err = Open(opt) require.NoError(t, err) + // Return after reading one entry. We're simulating a crash. // Simulate a crash by not closing db but releasing the locks. if db.dirLockGuard != nil { @@ -747,12 +735,6 @@ func TestWindowsDataLoss(t *testing.T) { // Don't use vlog.Close here. We don't want to fix the file size. Only un-mmap // the data so that we can truncate the file durning the next vlog.Open. require.NoError(t, y.Munmap(db.vlog.filesMap[db.vlog.maxFid].fmap)) - for _, f := range db.vlog.filesMap { - require.NoError(t, f.fd.Close()) - } - require.NoError(t, db.registry.Close()) - require.NoError(t, db.manifest.close()) - require.NoError(t, db.lc.close()) fmt.Println() fmt.Println("Third DB Open") diff --git a/db_test.go b/db_test.go index 59781511a..03ea090dd 100644 --- a/db_test.go +++ b/db_test.go @@ -23,8 +23,10 @@ import ( "flag" "fmt" "io/ioutil" + "log" "math" "math/rand" + "net/http" "os" "path/filepath" "runtime" @@ -287,13 +289,6 @@ func TestGet(t *testing.T) { test(t, db) require.NoError(t, db.Close()) }) - t.Run("cache disabled", func(t *testing.T) { - opts := DefaultOptions("").WithInMemory(true).WithMaxCacheSize(0) - db, err := Open(opts) - require.NoError(t, err) - test(t, db) - require.NoError(t, db.Close()) - }) } func TestGetAfterDelete(t *testing.T) { @@ -1166,9 +1161,6 @@ func TestExpiryImproperDBClose(t *testing.T) { // it would return Truncate Required Error. require.NoError(t, db0.vlog.Close()) - require.NoError(t, db0.registry.Close()) - require.NoError(t, db0.manifest.close()) - db1, err := Open(opt) require.NoError(t, err) err = db1.View(func(txn *Txn) error { @@ -1208,7 +1200,7 @@ func randBytes(n int) []byte { recv := make([]byte, n) in, err := rand.Read(recv) if err != nil { - panic(err) + log.Fatal(err) } return recv[:in] } @@ -1566,6 +1558,9 @@ func TestLSMOnly(t *testing.T) { opts.ValueLogMaxEntries = 100 db, err := Open(opts) require.NoError(t, err) + if err != nil { + t.Fatal(err) + } value := make([]byte, 128) _, err = rand.Read(value) @@ -1577,7 +1572,9 @@ func TestLSMOnly(t *testing.T) { db, err = Open(opts) require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } defer db.Close() require.NoError(t, db.RunValueLogGC(0.2)) } @@ -1673,12 +1670,12 @@ func TestGoroutineLeak(t *testing.T) { func ExampleOpen() { dir, err := ioutil.TempDir("", "badger-test") if err != nil { - panic(err) + log.Fatal(err) } defer removeDir(dir) db, err := Open(DefaultOptions(dir)) if err != nil { - panic(err) + log.Fatal(err) } defer db.Close() @@ -1690,17 +1687,17 @@ func ExampleOpen() { }) if err != nil { - panic(err) + log.Fatal(err) } txn := db.NewTransaction(true) // Read-write txn err = txn.SetEntry(NewEntry([]byte("key"), []byte("value"))) if err != nil { - panic(err) + log.Fatal(err) } err = txn.Commit() if err != nil { - panic(err) + log.Fatal(err) } err = db.View(func(txn *Txn) error { @@ -1717,7 +1714,7 @@ func ExampleOpen() { }) if err != nil { - panic(err) + log.Fatal(err) } // Output: @@ -1728,13 +1725,13 @@ func ExampleOpen() { func ExampleTxn_NewIterator() { dir, err := ioutil.TempDir("", "badger-test") if err != nil { - panic(err) + log.Fatal(err) } defer removeDir(dir) db, err := Open(DefaultOptions(dir)) if err != nil { - panic(err) + log.Fatal(err) } defer db.Close() @@ -1752,13 +1749,13 @@ func ExampleTxn_NewIterator() { for i := 0; i < n; i++ { err := txn.SetEntry(NewEntry(bkey(i), bval(i))) if err != nil { - panic(err) + log.Fatal(err) } } err = txn.Commit() if err != nil { - panic(err) + log.Fatal(err) } opt := DefaultIteratorOptions @@ -1775,7 +1772,7 @@ func ExampleTxn_NewIterator() { return nil }) if err != nil { - panic(err) + log.Fatal(err) } fmt.Printf("Counted %d elements", count) // Output: @@ -1957,10 +1954,79 @@ func TestVerifyChecksum(t *testing.T) { } func TestMain(m *testing.M) { - flag.Parse() + // call flag.Parse() here if TestMain uses flags + go func() { + if err := http.ListenAndServe("localhost:8080", nil); err != nil { + log.Fatalf("Unable to open http port at 8080") + } + }() os.Exit(m.Run()) } +func ExampleDB_Subscribe() { + prefix := []byte{'a'} + + // This key should be printed, since it matches the prefix. + aKey := []byte("a-key") + aValue := []byte("a-value") + + // This key should not be printed. + bKey := []byte("b-key") + bValue := []byte("b-value") + + // Open the DB. + dir, err := ioutil.TempDir("", "badger-test") + if err != nil { + log.Fatal(err) + } + defer removeDir(dir) + db, err := Open(DefaultOptions(dir)) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // Create the context here so we can cancel it after sending the writes. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Use the WaitGroup to make sure we wait for the subscription to stop before continuing. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + cb := func(kvs *KVList) error { + for _, kv := range kvs.Kv { + fmt.Printf("%s is now set to %s\n", kv.Key, kv.Value) + } + return nil + } + if err := db.Subscribe(ctx, cb, prefix); err != nil && err != context.Canceled { + log.Fatal(err) + } + log.Printf("subscription closed") + }() + + // Wait for the above go routine to be scheduled. + time.Sleep(time.Second) + // Write both keys, but only one should be printed in the Output. + err = db.Update(func(txn *Txn) error { return txn.Set(aKey, aValue) }) + if err != nil { + log.Fatal(err) + } + err = db.Update(func(txn *Txn) error { return txn.Set(bKey, bValue) }) + if err != nil { + log.Fatal(err) + } + + log.Printf("stopping subscription") + cancel() + log.Printf("waiting for subscription to close") + wg.Wait() + // Output: + // a-key is now set to a-value +} + func removeDir(dir string) { if err := os.RemoveAll(dir); err != nil { panic(err) diff --git a/go.mod b/go.mod index eae04e485..f7f1fb0d3 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 + github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index 4c71dbdf4..60e673a75 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e h1:aeUNgwup7PnDOBAD1BOKAqzb/W/NksOj6r3dwKKuqfg= +github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/level_handler.go b/level_handler.go index 19ba0892b..dbc2532ba 100644 --- a/level_handler.go +++ b/level_handler.go @@ -188,9 +188,7 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool { // Need lock as we may be deleting the first table during a level 0 compaction. s.Lock() defer s.Unlock() - // Return false only if L0 is in memory and number of tables is more than number of - // ZeroTableStall. For on disk L0, we should just add the tables to the level. - if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { + if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { return false } diff --git a/levels.go b/levels.go index 41faf6e0b..0a4b92f26 100644 --- a/levels.go +++ b/levels.go @@ -44,9 +44,12 @@ type levelsController struct { kv *DB cstatus compactStatus +} + +var ( // This is for getting timings between stalls. lastUnstalled time.Time -} +) // revertToManifest checks that all necessary table files exist and removes all table files not // referenced by the manifest. idMap is a set of table file id's that were read from the directory @@ -84,13 +87,12 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { for i := 0; i < db.opt.MaxLevels; i++ { s.levels[i] = newLevelHandler(db, i) - switch i { - case 0: + if i == 0 { // Do nothing. - case 1: + } else if i == 1 { // Level 1 probably shouldn't be too much bigger than level 0. s.levels[i].maxTotalSize = db.opt.LevelOneSize - default: + } else { s.levels[i].maxTotalSize = s.levels[i-1].maxTotalSize * int64(db.opt.LevelSizeMultiplier) } s.cstatus.levels[i] = new(levelCompactStatus) @@ -361,15 +363,12 @@ func (s *levelsController) runWorker(lc *y.Closer) { // Can add a done channel or other stuff. case <-ticker.C: prios := s.pickCompactLevels() - loop: for _, p := range prios { - err := s.doCompact(p) - switch err { - case nil: - break loop - case errFillTables: + if err := s.doCompact(p); err == nil { + break + } else if err == errFillTables { // pass - default: + } else { s.kv.opt.Warningf("While running doCompact: %v\n", err) } } @@ -425,42 +424,34 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { prios = append(prios, pri) } } - // We used to sort compaction priorities based on the score. But, we - // decided to compact based on the level, not the priority. So, upper - // levels (level 0, level 1, etc) always get compacted first, before the - // lower levels -- this allows us to avoid stalls. + sort.Slice(prios, func(i, j int) bool { + return prios[i].score > prios[j].score + }) return prios } -// checkOverlap checks if the given tables overlap with any level from the given "lev" onwards. -func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool { - kr := getKeyRange(tables...) - for i, lh := range s.levels { - if i < lev { // Skip upper levels. - continue - } - lh.RLock() - left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) - lh.RUnlock() - if right-left > 0 { - return true - } - } - return false -} - -// compactBuildTables merges topTables and botTables to form a list of new tables. +// compactBuildTables merge topTables and botTables to form a list of new tables. func (s *levelsController) compactBuildTables( lev int, cd compactDef) ([]*table.Table, func() error, error) { topTables := cd.top botTables := cd.bot - // Check overlap of the top level with the levels which are not being - // compacted in this compaction. We don't need to check overlap of the bottom - // tables with other levels because if the top tables overlap with any of the lower - // levels, it implies bottom level also overlaps because top and bottom tables - // overlap with each other. - hasOverlap := s.checkOverlap(cd.top, cd.nextLevel.level+1) + var hasOverlap bool + { + kr := getKeyRange(cd.top...) + for i, lh := range s.levels { + if i <= lev { // Skip upper levels. + continue + } + lh.RLock() + left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) + lh.RUnlock() + if right-left > 0 { + hasOverlap = true + break + } + } + } // Try to collect stats so that we can inform value log about GC. That would help us find which // value log file should be GCed. @@ -479,10 +470,9 @@ func (s *levelsController) compactBuildTables( // Create iterators across all the tables involved first. var iters []y.Iterator - switch { - case lev == 0: + if lev == 0 { iters = appendIteratorsReversed(iters, topTables, false) - case len(topTables) > 0: + } else if len(topTables) > 0 { y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(false)} } @@ -571,28 +561,22 @@ func (s *levelsController) compactBuildTables( // versions which are below the minReadTs, otherwise, we might end up discarding the // only valid version for a running transaction. numVersions++ - - // Keep the current version and discard all the next versions if - // - The `discardEarlierVersions` bit is set OR - // - We've already processed `NumVersionsToKeep` number of versions - // (including the current item being processed) - lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 || - numVersions == s.kv.opt.NumVersionsToKeep - - if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) || lastValidVersion { + lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 + if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) || + numVersions > s.kv.opt.NumVersionsToKeep || + lastValidVersion { // If this version of the key is deleted or expired, skip all the rest of the // versions. Ensure that we're only removing versions below readTs. skipKey = y.SafeCopy(skipKey, it.Key()) - switch { - case lastValidVersion: + if lastValidVersion { // Add this key. We have set skipKey, so the following key versions // would be skipped. - case hasOverlap: + } else if hasOverlap { // If this key range has overlap with lower levels, then keep the deletion // marker with the latest version, discarding the rest. We have set skipKey, // so the following key versions would be skipped. - default: + } else { // If no overlap, we can skip all the versions, by continuing here. numSkips++ updateStats(vs) @@ -932,7 +916,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // Stall. Make sure all levels are healthy before we unstall. var timeStart time.Time { - s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) + s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(lastUnstalled)) s.cstatus.RLock() for i := 0; i < s.kv.opt.MaxLevels; i++ { s.elog.Printf("level=%d. Status=%s Size=%d\n", @@ -941,13 +925,15 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { s.cstatus.RUnlock() timeStart = time.Now() } - // Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we - // will very quickly fill up level 0 again. + // Before we unstall, we need to make sure that level 0 and 1 are healthy. Otherwise, we + // will very quickly fill up level 0 again and if the compaction strategy favors level 0, + // then level 1 is going to super full. for i := 0; ; i++ { - // It's crucial that this behavior replicates pickCompactLevels' behavior in - // computing compactability in order to guarantee progress. - // Break the loop once L0 has enough space to accommodate new tables. - if !s.isLevel0Compactable() { + // Passing 0 for delSize to compactable means we're treating incomplete compactions as + // not having finished -- we wait for them to finish. Also, it's crucial this behavior + // replicates pickCompactLevels' behavior in computing compactability in order to + // guarantee progress. + if !s.isLevel0Compactable() && !s.levels[1].isCompactable(0) { break } time.Sleep(10 * time.Millisecond) @@ -959,7 +945,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { } { s.elog.Printf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart)) - s.lastUnstalled = time.Now() + lastUnstalled = time.Now() } } diff --git a/levels_test.go b/levels_test.go deleted file mode 100644 index 688e3d64d..000000000 --- a/levels_test.go +++ /dev/null @@ -1,567 +0,0 @@ -/* - * Copyright 2019 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package badger - -import ( - "math" - "testing" - "time" - - "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/table" - "github.com/dgraph-io/badger/v2/y" - "github.com/stretchr/testify/require" -) - -// createAndOpen creates a table with the given data and adds it to the given level. -func createAndOpen(db *DB, td []keyValVersion, level int) { - opts := table.Options{ - BlockSize: db.opt.BlockSize, - BloomFalsePositive: db.opt.BloomFalsePositive, - LoadingMode: options.LoadToRAM, - ChkMode: options.NoVerification, - } - b := table.NewTableBuilder(opts) - - // Add all keys and versions to the table. - for _, item := range td { - key := y.KeyWithTs([]byte(item.key), uint64(item.version)) - val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} - b.Add(key, val, 0) - } - fd, err := y.CreateSyncedFile(table.NewFilename(db.lc.reserveFileID(), db.opt.Dir), true) - if err != nil { - panic(err) - } - - if _, err = fd.Write(b.Finish()); err != nil { - panic(err) - } - tab, err := table.OpenTable(fd, opts) - if err != nil { - panic(err) - } - if err := db.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(tab.ID(), level, 0, tab.CompressionType()), - }); err != nil { - panic(err) - } - // Add table to the given level. - db.lc.levels[level].tables = append(db.lc.levels[level].tables, tab) -} - -type keyValVersion struct { - key string - val string - version int - meta byte -} - -func TestCheckOverlap(t *testing.T) { - t.Run("overlap", func(t *testing.T) { - // This test consists of one table on level 0 and one on level 1. - // There is an overlap amongst the tables but there is no overlap - // with rest of the levels. - t.Run("same keys", func(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"foo", "bar", 3, 0}} - l1 := []keyValVersion{{"foo", "bar", 2, 0}} - createAndOpen(db, l0, 0) - createAndOpen(db, l1, 1) - - // Level 0 should overlap with level 0 tables. - require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 0)) - // Level 1 should overlap with level 0 tables (they have the same keys). - require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) - // Level 2 and 3 should not overlap with level 0 tables. - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) - require.False(t, db.lc.checkOverlap(db.lc.levels[1].tables, 2)) - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) - require.False(t, db.lc.checkOverlap(db.lc.levels[1].tables, 3)) - - }) - }) - t.Run("overlapping keys", func(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"a", "x", 1, 0}, {"b", "x", 1, 0}, {"foo", "bar", 3, 0}} - l1 := []keyValVersion{{"foo", "bar", 2, 0}} - createAndOpen(db, l0, 0) - createAndOpen(db, l1, 1) - - // Level 0 should overlap with level 0 tables. - require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 0)) - require.True(t, db.lc.checkOverlap(db.lc.levels[1].tables, 1)) - // Level 1 should overlap with level 0 tables, "foo" key is common. - require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) - // Level 2 and 3 should not overlap with level 0 tables. - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) - }) - }) - }) - t.Run("non-overlapping", func(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"a", "x", 1, 0}, {"b", "x", 1, 0}, {"c", "bar", 3, 0}} - l1 := []keyValVersion{{"foo", "bar", 2, 0}} - createAndOpen(db, l0, 0) - createAndOpen(db, l1, 1) - - // Level 1 should not overlap with level 0 tables - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) - // Level 2 and 3 should not overlap with level 0 tables. - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) - require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) - }) - }) -} - -func getAllAndCheck(t *testing.T, db *DB, expected []keyValVersion) { - db.View(func(txn *Txn) error { - opt := DefaultIteratorOptions - opt.AllVersions = true - opt.InternalAccess = true - it := txn.NewIterator(opt) - defer it.Close() - i := 0 - for it.Rewind(); it.Valid(); it.Next() { - require.Less(t, i, len(expected), "DB has more number of key than expected") - item := it.Item() - v, err := item.ValueCopy(nil) - require.NoError(t, err) - // fmt.Printf("k: %s v: %d val: %s\n", item.key, item.Version(), v) - expect := expected[i] - require.Equal(t, expect.key, string(item.Key()), "expected key: %s actual key: %s", - expect.key, item.Key()) - require.Equal(t, expect.val, string(v), "key: %s expected value: %s actual %s", - item.key, expect.val, v) - require.Equal(t, expect.version, int(item.Version()), - "key: %s expected version: %d actual %d", item.key, expect.version, item.Version()) - require.Equal(t, expect.meta, item.meta, - "key: %s expected meta: %d meta %d", item.key, expect.meta, item.meta) - i++ - } - require.Equal(t, len(expected), i, "keys examined should be equal to keys expected") - return nil - }) - -} - -func TestCompaction(t *testing.T) { - // Disable compactions and keep single version of each key. - opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) - opt.managedTxns = true - t.Run("level 0 to level 1", func(t *testing.T) { - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} - l01 := []keyValVersion{{"foo", "bar", 2, 0}} - l1 := []keyValVersion{{"foo", "bar", 1, 0}} - // Level 0 has table l0 and l01. - createAndOpen(db, l0, 0) - createAndOpen(db, l01, 0) - // Level 1 has table l1. - createAndOpen(db, l1, 1) - - // Set a high discard timestamp so that all the keys are below the discard timestamp. - db.SetDiscardTs(10) - - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, - {"foo", "bar", 1, 0}, {"fooz", "baz", 1, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - // foo version 2 should be dropped after compaction. - getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) - }) - }) - - t.Run("level 0 to level 1 with lower overlap", func(t *testing.T) { - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} - l01 := []keyValVersion{{"foo", "bar", 2, 0}} - l1 := []keyValVersion{{"foo", "bar", 1, 0}} - l2 := []keyValVersion{{"foo", "bar", 0, 0}} - // Level 0 has table l0 and l01. - createAndOpen(db, l0, 0) - createAndOpen(db, l01, 0) - // Level 1 has table l1. - createAndOpen(db, l1, 1) - // Level 2 has table l2. - createAndOpen(db, l2, 2) - - // Set a high discard timestamp so that all the keys are below the discard timestamp. - db.SetDiscardTs(10) - - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"foo", "bar", 1, 0}, - {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - // foo version 2 and version 1 should be dropped after compaction. - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 3, 0}, {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, - }) - }) - }) - - t.Run("level 1 to level 2", func(t *testing.T) { - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l1 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} - l2 := []keyValVersion{{"foo", "bar", 2, 0}} - createAndOpen(db, l1, 1) - createAndOpen(db, l2, 2) - - // Set a high discard timestamp so that all the keys are below the discard timestamp. - db.SetDiscardTs(10) - - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"fooz", "baz", 1, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[1], - nextLevel: db.lc.levels[2], - top: db.lc.levels[1].tables, - bot: db.lc.levels[2].tables, - } - require.NoError(t, db.lc.runCompactDef(1, cdef)) - // foo version 2 should be dropped after compaction. - getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) - }) - }) -} - -func TestHeadKeyCleanup(t *testing.T) { - // Disable compactions and keep single version of each key. - opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) - opt.managedTxns = true - - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{ - {string(head), "foo", 5, 0}, {string(head), "bar", 4, 0}, {string(head), "baz", 3, 0}, - } - l1 := []keyValVersion{{string(head), "fooz", 2, 0}, {string(head), "foozbaz", 1, 0}} - // Level 0 has table l0 and l01. - createAndOpen(db, l0, 0) - // Level 1 has table l1. - createAndOpen(db, l1, 1) - - // Set a high discard timestamp so that all the keys are below the discard timestamp. - db.SetDiscardTs(10) - - getAllAndCheck(t, db, []keyValVersion{ - {string(head), "foo", 5, 0}, {string(head), "bar", 4, 0}, {string(head), "baz", 3, 0}, - {string(head), "fooz", 2, 0}, {string(head), "foozbaz", 1, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - // foo version 2 should be dropped after compaction. - getAllAndCheck(t, db, []keyValVersion{{string(head), "foo", 5, 0}}) - }) -} - -func TestDiscardTs(t *testing.T) { - // Disable compactions and keep single version of each key. - opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) - opt.managedTxns = true - - t.Run("all keys above discardTs", func(t *testing.T) { - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}} - l01 := []keyValVersion{{"foo", "bar", 3, 0}} - l1 := []keyValVersion{{"foo", "bar", 2, 0}} - // Level 0 has table l0 and l01. - createAndOpen(db, l0, 0) - createAndOpen(db, l01, 0) - // Level 1 has table l1. - createAndOpen(db, l1, 1) - - // Set dicardTs to 1. All the keys are above discardTs. - db.SetDiscardTs(1) - - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, - {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - // No keys should be dropped. - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, - {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, - }) - }) - }) - t.Run("some keys above discardTs", func(t *testing.T) { - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{ - {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, - {"foo", "bar", 2, 0}, {"fooz", "baz", 2, 0}, - } - l1 := []keyValVersion{{"foo", "bbb", 1, 0}} - createAndOpen(db, l0, 0) - createAndOpen(db, l1, 1) - - // Set dicardTs to 3. foo2 and foo1 should be dropped. - db.SetDiscardTs(3) - - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, - {"foo", "bbb", 1, 0}, {"fooz", "baz", 2, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - // foo1 and foo2 should be dropped. - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"fooz", "baz", 2, 0}, - }) - }) - }) - t.Run("all keys below discardTs", func(t *testing.T) { - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}} - l01 := []keyValVersion{{"foo", "bar", 3, 0}} - l1 := []keyValVersion{{"foo", "bar", 2, 0}} - // Level 0 has table l0 and l01. - createAndOpen(db, l0, 0) - createAndOpen(db, l01, 0) - // Level 1 has table l1. - createAndOpen(db, l1, 1) - - // Set dicardTs to 10. All the keys are below discardTs. - db.SetDiscardTs(10) - - getAllAndCheck(t, db, []keyValVersion{ - {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, - {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, - }) - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - // Only one version of every key should be left. - getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}}) - }) - }) -} - -// This is a test to ensure that the first entry with DiscardEarlierversion bit < DiscardTs -// is kept around (when numversionstokeep is infinite). -func TestDiscardFirstVersion(t *testing.T) { - opt := DefaultOptions("") - opt.NumCompactors = 0 - opt.NumVersionsToKeep = math.MaxInt32 - opt.managedTxns = true - - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - l0 := []keyValVersion{{"foo", "bar", 1, 0}} - l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}} - l02 := []keyValVersion{{"foo", "bar", 3, 0}} - l03 := []keyValVersion{{"foo", "bar", 4, 0}} - l04 := []keyValVersion{{"foo", "bar", 9, 0}} - l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}} - - // Level 0 has all the tables. - createAndOpen(db, l0, 0) - createAndOpen(db, l01, 0) - createAndOpen(db, l02, 0) - createAndOpen(db, l03, 0) - createAndOpen(db, l04, 0) - createAndOpen(db, l05, 0) - - // Discard Time stamp is set to 7. - db.SetDiscardTs(7) - - // Compact L0 to L1 - cdef := compactDef{ - thisLevel: db.lc.levels[0], - nextLevel: db.lc.levels[1], - top: db.lc.levels[0].tables, - bot: db.lc.levels[1].tables, - } - require.NoError(t, db.lc.runCompactDef(0, cdef)) - - // - Version 10, 9 lie above version 7 so they should be there. - // - Version 4, 3, 2 lie below the discardTs but they don't have the - // "bitDiscardEarlierVersions" versions set so they should not be removed because number - // of versions to keep is set to infinite. - // - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions" - // marker so IT WILL BE REMOVED. - ExpectedKeys := []keyValVersion{ - {"foo", "bar", 10, bitDiscardEarlierVersions}, - {"foo", "bar", 9, 0}, - {"foo", "bar", 4, 0}, - {"foo", "bar", 3, 0}, - {"foo", "bar", 2, bitDiscardEarlierVersions}} - - getAllAndCheck(t, db, ExpectedKeys) - }) -} - -// This test ensures we don't stall when L1's size is greater than opt.LevelOneSize. -// We should stall only when L0 tables more than the opt.NumLevelZeroTableStall. -func TestL1Stall(t *testing.T) { - opt := DefaultOptions("") - // Disable all compactions. - opt.NumCompactors = 0 - // Number of level zero tables. - opt.NumLevelZeroTables = 3 - // Addition of new tables will stall if there are 4 or more L0 tables. - opt.NumLevelZeroTablesStall = 4 - // Level 1 size is 10 bytes. - opt.LevelOneSize = 10 - - runBadgerTest(t, &opt, func(t *testing.T, db *DB) { - // Level 0 has 4 tables. - db.lc.levels[0].Lock() - db.lc.levels[0].tables = []*table.Table{createEmptyTable(db), createEmptyTable(db), - createEmptyTable(db), createEmptyTable(db)} - db.lc.levels[0].Unlock() - - timeout := time.After(5 * time.Second) - done := make(chan bool) - - // This is important. Set level 1 size more than the opt.LevelOneSize (we've set it to 10). - db.lc.levels[1].totalSize = 100 - go func() { - tab := createEmptyTable(db) - require.NoError(t, db.lc.addLevel0Table(tab)) - tab.DecrRef() - done <- true - }() - time.Sleep(time.Second) - - db.lc.levels[0].Lock() - // Drop two tables from Level 0 so that addLevel0Table can make progress. Earlier table - // count was 4 which is equal to L0 stall count. - toDrop := db.lc.levels[0].tables[:2] - decrRefs(toDrop) - db.lc.levels[0].tables = db.lc.levels[0].tables[2:] - db.lc.levels[0].Unlock() - - select { - case <-timeout: - t.Fatal("Test didn't finish in time") - case <-done: - } - }) -} - -func createEmptyTable(db *DB) *table.Table { - opts := table.Options{ - BloomFalsePositive: db.opt.BloomFalsePositive, - LoadingMode: options.LoadToRAM, - ChkMode: options.NoVerification, - } - b := table.NewTableBuilder(opts) - // Add one key so that we can open this table. - b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0) - - // Open table in memory to avoid adding changes to manifest file. - tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts) - if err != nil { - panic(err) - } - - return tab -} - -func TestL0Stall(t *testing.T) { - test := func(t *testing.T, opt *Options) { - runBadgerTest(t, opt, func(t *testing.T, db *DB) { - db.lc.levels[0].Lock() - // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level - // zero and all new additions are expected to stall if L0 is in memory. - for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { - db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) - } - db.lc.levels[0].Unlock() - - timeout := time.After(5 * time.Second) - done := make(chan bool) - - go func() { - tab := createEmptyTable(db) - require.NoError(t, db.lc.addLevel0Table(tab)) - tab.DecrRef() - done <- true - }() - // Let it stall for a second. - time.Sleep(time.Second) - - select { - case <-timeout: - if opt.KeepL0InMemory { - t.Log("Timeout triggered") - // Mark this test as successful since L0 is in memory and the - // addition of new table to L0 is supposed to stall. - } else { - t.Fatal("Test didn't finish in time") - } - case <-done: - // The test completed before 5 second timeout. Mark it as successful. - } - }) - } - - opt := DefaultOptions("") - opt.EventLogging = false - // Disable all compactions. - opt.NumCompactors = 0 - // Number of level zero tables. - opt.NumLevelZeroTables = 3 - // Addition of new tables will stall if there are 4 or more L0 tables. - opt.NumLevelZeroTablesStall = 4 - - t.Run("with KeepL0InMemory", func(t *testing.T) { - opt.KeepL0InMemory = true - test(t, &opt) - }) - t.Run("with L0 on disk", func(t *testing.T) { - opt.KeepL0InMemory = false - test(t, &opt) - }) -} diff --git a/manifest_test.go b/manifest_test.go index 5062b3f1b..c9a51fc60 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -168,7 +168,6 @@ func TestOverlappingKeyRangeError(t *testing.T) { defer removeDir(dir) kv, err := Open(DefaultOptions(dir)) require.NoError(t, err) - defer kv.Close() lh0 := newLevelHandler(kv, 0) lh1 := newLevelHandler(kv, 1) diff --git a/options.go b/options.go index 4fbe09199..4374fc39d 100644 --- a/options.go +++ b/options.go @@ -21,6 +21,7 @@ import ( "github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/table" + "github.com/dgraph-io/badger/v2/y" ) // Note: If you add a new option X make sure you also add a WithX method on Options. @@ -101,6 +102,11 @@ type Options struct { // DefaultOptions sets a list of recommended options for good performance. // Feel free to modify these to suit your needs with the WithX methods. func DefaultOptions(path string) Options { + defaultCompression := options.ZSTD + // Use snappy as default compression algorithm if badger is built without CGO. + if !y.CgoEnabled { + defaultCompression = options.Snappy + } return Options{ Dir: path, ValueDir: path, @@ -123,19 +129,16 @@ func DefaultOptions(path string) Options { CompactL0OnClose: true, KeepL0InMemory: true, VerifyValueChecksum: false, - Compression: options.None, + Compression: defaultCompression, MaxCacheSize: 1 << 30, // 1 GB - // The following benchmarks were done on a 4 KB block size (default block size). The - // compression is ratio supposed to increase with increasing compression level but since the - // input for compression algorithm is small (4 KB), we don't get significant benefit at - // level 3. - // no_compression-16 10 502848865 ns/op 165.46 MB/s - - // zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93 - // zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72 - // zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38 - // Benchmark code can be found in table/builder_test.go file - ZSTDCompressionLevel: 1, - + // Benchmarking compression level against performance showed that level 15 gives + // the best speed vs ratio tradeoff. + // For a data size of 4KB we get + // Level: 3 Ratio: 2.72 Time: 24112 n/s + // Level: 10 Ratio: 2.95 Time: 75655 n/s + // Level: 15 Ratio: 4.38 Time: 239042 n/s + // See https://github.com/dgraph-io/badger/pull/1111#issue-338120757 + ZSTDCompressionLevel: 15, // Nothing to read/write value log using standard File I/O // MemoryMap to mmap() the value log files // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32. @@ -536,8 +539,7 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat // WithMaxCacheSize returns a new Options value with MaxCacheSize set to the given value. // // This value specifies how much data cache should hold in memory. A small size of cache means lower -// memory consumption and lookups/iterations would take longer. Setting size to zero disables the -// cache altogether. +// memory consumption and lookups/iterations would take longer. func (opt Options) WithMaxCacheSize(size int64) Options { opt.MaxCacheSize = size return opt @@ -558,18 +560,7 @@ func (opt Options) WithInMemory(b bool) Options { // The ZSTD compression algorithm supports 20 compression levels. The higher the compression // level, the better is the compression ratio but lower is the performance. Lower levels // have better performance and higher levels have better compression ratios. -// We recommend using level 1 ZSTD Compression Level. Any level higher than 1 seems to -// deteriorate badger's performance. -// The following benchmarks were done on a 4 KB block size (default block size). The compression is -// ratio supposed to increase with increasing compression level but since the input for compression -// algorithm is small (4 KB), we don't get significant benefit at level 3. It is advised to write -// your own benchmarks before choosing a compression algorithm or level. -// -// no_compression-16 10 502848865 ns/op 165.46 MB/s - -// zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93 -// zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72 -// zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38 -// Benchmark code can be found in table/builder_test.go file +// The default value of ZSTDCompressionLevel is 15. func (opt Options) WithZSTDCompressionLevel(cLevel int) Options { opt.ZSTDCompressionLevel = cLevel return opt diff --git a/skl/skl.go b/skl/skl.go index 43694f14b..cdfc599be 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -34,11 +34,11 @@ package skl import ( "math" + "math/rand" "sync/atomic" "unsafe" "github.com/dgraph-io/badger/v2/y" - "github.com/dgraph-io/ristretto/z" ) const ( @@ -165,9 +165,9 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { // return n != nil && y.CompareKeys(key, n.key) > 0 //} -func (s *Skiplist) randomHeight() int { +func randomHeight() int { h := 1 - for h < maxHeight && z.FastRand() <= heightIncrease { + for h < maxHeight && rand.Uint32() <= heightIncrease { h++ } return h @@ -300,7 +300,7 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) { } // We do need to create a new node. - height := s.randomHeight() + height := randomHeight() x := newNode(s.arena, key, v, height) // Try to increase s.height via CAS. diff --git a/skl/skl_test.go b/skl/skl_test.go index 0be7a64e4..6bd075862 100644 --- a/skl/skl_test.go +++ b/skl/skl_test.go @@ -499,7 +499,7 @@ func BenchmarkReadWriteMap(b *testing.B) { b.RunParallel(func(pb *testing.PB) { rng := rand.New(rand.NewSource(time.Now().UnixNano())) for pb.Next() { - if rng.Float32() < readFrac { + if rand.Float32() < readFrac { mutex.RLock() _, ok := m[string(randomKey(rng))] mutex.RUnlock() @@ -516,16 +516,3 @@ func BenchmarkReadWriteMap(b *testing.B) { }) } } - -func BenchmarkWrite(b *testing.B) { - value := newValue(123) - l := NewSkiplist(int64((b.N + 1) * MaxNodeSize)) - defer l.DecrRef() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - for pb.Next() { - l.Put(randomKey(rng), y.ValueStruct{Value: value, Meta: 0, UserMeta: 0}) - } - }) -} diff --git a/table/builder_test.go b/table/builder_test.go index 76296562e..3af2ce358 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -57,7 +57,7 @@ func TestTableIndex(t *testing.T) { keysCount := 10000 for _, opt := range opts { builder := NewTableBuilder(opt) - filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) + filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Int63()) f, err := y.OpenSyncedFile(filename, true) require.NoError(t, err) @@ -80,11 +80,11 @@ func TestTableIndex(t *testing.T) { require.NoError(t, err, "unable to write to file") tbl, err := OpenTable(f, opt) - require.NoError(t, err, "unable to open table") if opt.DataKey == nil { // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } + require.NoError(t, err, "unable to open table") // Ensure index is built correctly require.Equal(t, blockCount, len(tbl.blockIndex)) @@ -124,42 +124,14 @@ func BenchmarkBuilder(b *testing.B) { vs := y.ValueStruct{Value: []byte(val)} keysCount := 1300000 // This number of entries consumes ~64MB of memory. + for i := 0; i < b.N; i++ { + opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + builder := NewTableBuilder(opts) - bench := func(b *testing.B, opt *Options) { - // KeyCount * (keySize + ValSize) - b.SetBytes(int64(keysCount) * (32 + 32)) - for i := 0; i < b.N; i++ { - opt.BlockSize = 4 * 1024 - opt.BloomFalsePositive = 0.01 - builder := NewTableBuilder(*opt) - - for i := 0; i < keysCount; i++ { - builder.Add(key(i), vs, 0) - } - - _ = builder.Finish() + for i := 0; i < keysCount; i++ { + builder.Add(key(i), vs, 0) } - } - b.Run("no compression", func(b *testing.B) { - var opt Options - opt.Compression = options.None - bench(b, &opt) - }) - b.Run("zstd compression", func(b *testing.B) { - var opt Options - opt.Compression = options.ZSTD - b.Run("level 1", func(b *testing.B) { - opt.ZSTDCompressionLevel = 1 - bench(b, &opt) - }) - b.Run("level 3", func(b *testing.B) { - opt.ZSTDCompressionLevel = 3 - bench(b, &opt) - }) - b.Run("level 15", func(b *testing.B) { - opt.ZSTDCompressionLevel = 15 - bench(b, &opt) - }) - }) + _ = builder.Finish() + } } diff --git a/table/merge_iterator.go b/table/merge_iterator.go index e1809e027..e93edbbb9 100644 --- a/table/merge_iterator.go +++ b/table/merge_iterator.go @@ -55,18 +55,17 @@ func (n *node) setIterator(iter y.Iterator) { } func (n *node) setKey() { - switch { - case n.merge != nil: + if n.merge != nil { n.valid = n.merge.small.valid if n.valid { n.key = n.merge.small.key } - case n.concat != nil: + } else if n.concat != nil { n.valid = n.concat.Valid() if n.valid { n.key = n.concat.Key() } - default: + } else { n.valid = n.iter.Valid() if n.valid { n.key = n.iter.Key() @@ -75,12 +74,11 @@ func (n *node) setKey() { } func (n *node) next() { - switch { - case n.merge != nil: + if n.merge != nil { n.merge.Next() - case n.concat != nil: + } else if n.concat != nil { n.concat.Next() - default: + } else { n.iter.Next() } n.setKey() @@ -105,22 +103,22 @@ func (mi *MergeIterator) fix() { return } cmp := y.CompareKeys(mi.small.key, mi.bigger().key) - switch { - case cmp == 0: // Both the keys are equal. + // Both the keys are equal. + if cmp == 0 { // In case of same keys, move the right iterator ahead. mi.right.next() if &mi.right == mi.small { mi.swapSmall() } return - case cmp < 0: // Small is less than bigger(). + } else if cmp < 0 { // Small is less than bigger(). if mi.reverse { mi.swapSmall() } else { // we don't need to do anything. Small already points to the smallest. } return - default: // bigger() is less than small. + } else { // bigger() is less than small. if mi.reverse { // Do nothing since we're iterating in reverse. Small currently points to // the bigger key and that's okay in reverse iteration. @@ -208,12 +206,11 @@ func (mi *MergeIterator) Close() error { // NewMergeIterator creates a merge iterator. func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator { - switch len(iters) { - case 0: + if len(iters) == 0 { return nil - case 1: + } else if len(iters) == 1 { return iters[0] - case 2: + } else if len(iters) == 2 { mi := &MergeIterator{ reverse: reverse, } diff --git a/table/table.go b/table/table.go index 25227be34..d68169384 100644 --- a/table/table.go +++ b/table/table.go @@ -18,7 +18,6 @@ package table import ( "crypto/aes" - "encoding/binary" "fmt" "io" "math" @@ -82,7 +81,7 @@ type TableInterface interface { DoesNotHave(hash uint64) bool } -// Table represents a loaded table file with the info we have about it. +// Table represents a loaded table file with the info we have about it type Table struct { sync.Mutex @@ -98,11 +97,10 @@ type Table struct { smallest, biggest []byte // Smallest and largest keys (with timestamps). id uint64 // file id, part of filename + bf *z.Bloom Checksum []byte // Stores the total size of key-values stored in this table (including the size on vlog). estimatedSize uint64 - indexStart int - indexLen int IsInmemory bool // Set to true if the table is on level 0 and opened in memory. opt *Options @@ -148,13 +146,6 @@ func (t *Table) DecrRef() error { if err := os.Remove(filename); err != nil { return err } - // Delete all blocks from the cache. - for i := range t.blockIndex { - t.opt.Cache.Del(t.blockCacheKey(i)) - } - // Delete bloom filter from the cache. - t.opt.Cache.Del(t.bfCacheKey()) - } return nil } @@ -241,7 +232,6 @@ func OpenTable(fd *os.File, opts Options) (*Table, error) { if err := t.initBiggestAndSmallest(); err != nil { return nil, errors.Wrapf(err, "failed to initialize table") } - if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead { if err := t.VerifyChecksum(); err != nil { _ = fd.Close() @@ -330,9 +320,6 @@ func (t *Table) readIndex() error { readPos -= 4 buf := t.readNoFail(readPos, 4) checksumLen := int(y.BytesToU32(buf)) - if checksumLen < 0 { - return errors.New("checksum length less than zero. Data corrupted") - } // Read checksum. expectedChk := &pb.Checksum{} @@ -345,12 +332,10 @@ func (t *Table) readIndex() error { // Read index size from the footer. readPos -= 4 buf = t.readNoFail(readPos, 4) - t.indexLen = int(y.BytesToU32(buf)) - + indexLen := int(y.BytesToU32(buf)) // Read index. - readPos -= t.indexLen - t.indexStart = readPos - data := t.readNoFail(readPos, t.indexLen) + readPos -= indexLen + data := t.readNoFail(readPos, indexLen) if err := y.VerifyChecksum(data, expectedChk); err != nil { return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) @@ -369,18 +354,8 @@ func (t *Table) readIndex() error { y.Check(err) t.estimatedSize = index.EstimatedSize + t.bf = z.JSONUnmarshal(index.BloomFilter) t.blockIndex = index.Offsets - - // Avoid the cost of unmarshalling the bloom filters if the cache is absent. - if t.opt.Cache != nil { - var bf *z.Bloom - if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil { - return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex", - t.id) - } - - t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter))) - } return nil } @@ -461,25 +436,10 @@ func (t *Table) block(idx int) (*block, error) { return blk, nil } -// bfCacheKey returns the cache key for bloom filter. -func (t *Table) bfCacheKey() []byte { - y.AssertTrue(t.id < math.MaxUint32) - buf := make([]byte, 4) - binary.BigEndian.PutUint32(buf, uint32(t.id)) - - // Without the "bf" prefix, we will have conflict with the blockCacheKey. - return append([]byte("bf"), buf...) -} - -func (t *Table) blockCacheKey(idx int) []byte { - y.AssertTrue(t.id < math.MaxUint32) +func (t *Table) blockCacheKey(idx int) uint64 { + y.AssertTrue(t.ID() < math.MaxUint32) y.AssertTrue(uint32(idx) < math.MaxUint32) - - buf := make([]byte, 8) - // Assume t.ID does not overflow uint32. - binary.BigEndian.PutUint32(buf[:4], uint32(t.ID())) - binary.BigEndian.PutUint32(buf[4:], uint32(idx)) - return buf + return (t.ID() << 32) | uint64(idx) } // EstimatedSize returns the total size of key-values stored in this table (including the @@ -503,44 +463,7 @@ func (t *Table) ID() uint64 { return t.id } // DoesNotHave returns true if (but not "only if") the table does not have the key hash. // It does a bloom filter lookup. -func (t *Table) DoesNotHave(hash uint64) bool { - var bf *z.Bloom - - // Return fast if cache is absent. - if t.opt.Cache == nil { - bf, _ := t.readBloomFilter() - return !bf.Has(hash) - } - - // Check if the bloomfilter exists in the cache. - if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok { - bf = b.(*z.Bloom) - return !bf.Has(hash) - } - - bf, sz := t.readBloomFilter() - t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz)) - return !bf.Has(hash) -} - -// readBloomFilter reads the bloom filter from the SST and returns its length -// along with the bloom filter. -func (t *Table) readBloomFilter() (*z.Bloom, int) { - // Read bloom filter from the SST. - data := t.readNoFail(t.indexStart, t.indexLen) - index := pb.TableIndex{} - var err error - // Decrypt the table index if it is encrypted. - if t.shouldDecrypt() { - data, err = t.decrypt(data) - y.Check(err) - } - y.Check(proto.Unmarshal(data, &index)) - - bf, err := z.JSONUnmarshal(index.BloomFilter) - y.Check(err) - return bf, len(index.BloomFilter) -} +func (t *Table) DoesNotHave(hash uint64) bool { return !t.bf.Has(hash) } // VerifyChecksum verifies checksum for all blocks of table. This function is called by // OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). diff --git a/table/table_test.go b/table/table_test.go index 27a4f1d16..82bddf591 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -77,9 +77,13 @@ func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. - filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Uint32()) + filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) f, err := y.CreateSyncedFile(filename, true) - require.NoError(t, err) + if t != nil { + require.NoError(t, err) + } else { + y.Check(err) + } sort.Slice(keyValues, func(i, j int) bool { return keyValues[i][0] < keyValues[j][0] @@ -739,10 +743,7 @@ func TestTableChecksum(t *testing.T) { f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") - // Write random bytes at random location. - n, err := f.WriteAt(rb, rand.Int63n(fi.Size())) - require.NoError(t, err) - require.Equal(t, n, len(rb)) + f.WriteAt(rb, rand.Int63n(fi.Size())) _, err = OpenTable(f, opts) if err == nil || !strings.Contains(err.Error(), "checksum") { diff --git a/test.sh b/test.sh index b4e40601a..90d21889c 100755 --- a/test.sh +++ b/test.sh @@ -4,43 +4,30 @@ set -e go version -packages=$(go list ./... | grep github.com/dgraph-io/badger/v2/) - -if [[ ! -z "$TEAMCITY_VERSION" ]]; then - export GOFLAGS="-json" -fi - # Ensure that we can compile the binary. pushd badger go build -v . popd # Run the memory intensive tests first. -go test -v -run='TestBigKeyValuePairs$' --manual=true -go test -v -run='TestPushValueLogLimit' --manual=true +go test -v --manual=true -run='TestBigKeyValuePairs$' +go test -v --manual=true -run='TestPushValueLogLimit' # Run the special Truncate test. rm -rf p -go test -v -run='TestTruncateVlogNoClose$' --manual=true +go test -v --manual=true -run='TestTruncateVlogNoClose$' . truncate --size=4096 p/000000.vlog -go test -v -run='TestTruncateVlogNoClose2$' --manual=true -go test -v -run='TestTruncateVlogNoClose3$' --manual=true +go test -v --manual=true -run='TestTruncateVlogNoClose2$' . +go test -v --manual=true -run='TestTruncateVlogNoClose3$' . rm -rf p # Then the normal tests. -echo -echo "==> Starting test for table, skl and y package" -go test -v -race github.com/dgraph-io/badger/v2/skl -# Run test for all package except the top level package. The top level package support the -# `vlog_mmap` flag which rest of the packages don't support. -go test -v -race $packages - echo echo "==> Starting tests with value log mmapped..." -# Run top level package tests with mmap flag. -go test -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=true +sleep 5 +go test -v --vlog_mmap=true -race ./... echo echo "==> Starting tests with value log not mmapped..." -go test -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=false - +sleep 5 +go test -v --vlog_mmap=false -race ./... diff --git a/txn_test.go b/txn_test.go index 647bd9806..8450da910 100644 --- a/txn_test.go +++ b/txn_test.go @@ -837,8 +837,9 @@ func TestManagedDB(t *testing.T) { func TestArmV7Issue311Fix(t *testing.T) { dir, err := ioutil.TempDir("", "") - require.NoError(t, err) - + if err != nil { + t.Fatal(err) + } defer removeDir(dir) db, err := Open(DefaultOptions(dir). @@ -847,21 +848,31 @@ func TestArmV7Issue311Fix(t *testing.T) { WithLevelOneSize(8 << 20). WithMaxTableSize(2 << 20). WithSyncWrites(false)) - - require.NoError(t, err) + if err != nil { + t.Fatalf("cannot open db at location %s: %v", dir, err) + } err = db.View(func(txn *Txn) error { return nil }) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } err = db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte{0x11}, []byte{0x22})) }) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } err = db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte{0x11}, []byte{0x22})) }) - require.NoError(t, err) - require.NoError(t, db.Close()) + if err != nil { + t.Fatal(err) + } + + if err = db.Close(); err != nil { + t.Fatal(err) + } } diff --git a/value.go b/value.go index b0d94ad86..cdf575a29 100644 --- a/value.go +++ b/value.go @@ -197,7 +197,7 @@ func (lf *logFile) encryptionEnabled() bool { } func (lf *logFile) munmap() (err error) { - if lf.loadingMode != options.MemoryMap || len(lf.fmap) == 0 { + if lf.loadingMode != options.MemoryMap { // Nothing to do return nil } @@ -436,18 +436,15 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, var lastCommit uint64 var validEndOffset uint32 = offset - -loop: for { e, err := read.Entry(reader) - switch { - case err == io.EOF: - break loop - case err == io.ErrUnexpectedEOF || err == errTruncate: - break loop - case err != nil: + if err == io.EOF { + break + } else if err == io.ErrUnexpectedEOF || err == errTruncate { + break + } else if err != nil { return 0, err - case e == nil: + } else if e == nil { continue } @@ -458,30 +455,29 @@ loop: vp.Offset = e.offset vp.Fid = lf.fid - switch { - case e.meta&bitTxn > 0: + if e.meta&bitTxn > 0 { txnTs := y.ParseTs(e.Key) if lastCommit == 0 { lastCommit = txnTs } if lastCommit != txnTs { - break loop + break } - case e.meta&bitFinTxn > 0: + } else if e.meta&bitFinTxn > 0 { txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil || lastCommit != txnTs { - break loop + break } // Got the end of txn. Now we can store them. lastCommit = 0 validEndOffset = read.recordOffset - default: + } else { if lastCommit != 0 { // This is most likely an entry which was moved as part of GC. // We shouldn't get this entry in the middle of a transaction. - break loop + break } validEndOffset = read.recordOffset } @@ -497,9 +493,7 @@ loop: } func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { - vlog.filesLock.RLock() - maxFid := vlog.maxFid - vlog.filesLock.RUnlock() + maxFid := atomic.LoadUint32(&vlog.maxFid) y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid) tr.LazyPrintf("Rewriting fid: %d", f.fid) @@ -529,19 +523,12 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { var vp valuePointer vp.Decode(vs.Value) - // If the entry found from the LSM Tree points to a newer vlog file, don't do anything. if vp.Fid > f.fid { return nil } - // If the entry found from the LSM Tree points to an offset greater than the one - // read from vlog, don't do anything. if vp.Offset > e.offset { return nil } - // If the entry read from LSM Tree and vlog file point to the same vlog file and offset, - // insert them back into the DB. - // NOTE: It might be possible that the entry read from the LSM Tree points to - // an older vlog file. See the comments in the else part. if vp.Fid == f.fid && vp.Offset == e.offset { moved++ // This new entry only contains the key, and a pointer to the value. @@ -577,46 +564,7 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { wb = append(wb, ne) size += es } else { - // It might be possible that the entry read from LSM Tree points to an older vlog file. - // This can happen in the following situation. Assume DB is opened with - // numberOfVersionsToKeep=1 - // - // Now, if we have ONLY one key in the system "FOO" which has been updated 3 times and - // the same key has been garbage collected 3 times, we'll have 3 versions of the movekey - // for the same key "FOO". - // NOTE: moveKeyi is the moveKey with version i - // Assume we have 3 move keys in L0. - // - moveKey1 (points to vlog file 10), - // - moveKey2 (points to vlog file 14) and - // - moveKey3 (points to vlog file 15). - - // Also, assume there is another move key "moveKey1" (points to vlog file 6) (this is - // also a move Key for key "FOO" ) on upper levels (let's say 3). The move key - // "moveKey1" on level 0 was inserted because vlog file 6 was GCed. - // - // Here's what the arrangement looks like - // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15) - // L1 => .... - // L2 => .... - // L3 => (moveKey1 => vlog6) - // - // When L0 compaction runs, it keeps only moveKey3 because the number of versions - // to keep is set to 1. (we've dropped moveKey1's latest version) - // - // The new arrangement of keys is - // L0 => .... - // L1 => (moveKey3 => vlog15) - // L2 => .... - // L3 => (moveKey1 => vlog6) - // - // Now if we try to GC vlog file 10, the entry read from vlog file will point to vlog10 - // but the entry read from LSM Tree will point to vlog6. The move key read from LSM tree - // will point to vlog6 because we've asked for version 1 of the move key. - // - // This might seem like an issue but it's not really an issue because the user has set - // the number of versions to keep to 1 and the latest version of moveKey points to the - // correct vlog file and offset. The stale move key on L3 will be eventually dropped by - // compaction because there is a newer versions in the upper levels. + vlog.db.opt.Warningf("This entry should have been caught. %+v\n", e) } return nil } @@ -814,9 +762,10 @@ func (vlog *valueLog) dropAll() (int, error) { } vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0") - if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped. + if _, err := vlog.createVlogFile(0); err != nil { return count, err } + atomic.StoreUint32(&vlog.maxFid, 0) return count, nil } @@ -837,12 +786,12 @@ type valueLog struct { // guards our view of which files exist, which to be deleted, how many active iterators filesLock sync.RWMutex filesMap map[uint32]*logFile - maxFid uint32 filesToBeDeleted []uint32 // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted. numActiveIterators int32 db *DB + maxFid uint32 // accessed via atomics. writableLogOffset uint32 // read by read, written by write. Must access via atomics. numEntriesWritten uint32 opt Options @@ -907,11 +856,7 @@ func (lf *logFile) open(path string, flags uint32) error { return errFile(err, lf.path, "Unable to run file.Stat") } sz := fi.Size() - y.AssertTruef( - sz <= math.MaxUint32, - "file size: %d greater than %d", - uint32(sz), uint32(math.MaxUint32), - ) + y.AssertTruef(sz <= math.MaxUint32, "file size: %d greater than %d", sz, math.MaxUint32) lf.size = uint32(sz) if sz < vlogHeaderSize { // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize @@ -1002,15 +947,14 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) { if err = lf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil { return nil, errFile(err, lf.path, "Mmap value log file") } - - vlog.filesLock.Lock() - vlog.filesMap[fid] = lf - vlog.maxFid = fid // writableLogOffset is only written by write func, by read by Read func. // To avoid a race condition, all reads and updates to this variable must be // done via atomics. atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize) vlog.numEntriesWritten = 0 + + vlog.filesLock.Lock() + vlog.filesMap[fid] = lf vlog.filesLock.Unlock() return lf, nil @@ -1161,12 +1105,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { // plain text mode or vice versa. A single vlog file can't have both // encrypted entries and plain text entries. if last.encryptionEnabled() != vlog.db.shouldEncrypt() { - newid := vlog.maxFid + 1 + newid := atomic.AddUint32(&vlog.maxFid, 1) _, err := vlog.createVlogFile(newid) if err != nil { return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid) } - last, ok = vlog.filesMap[newid] + last, ok = vlog.filesMap[vlog.maxFid] y.AssertTrue(ok) } lastOffset, err := last.fd.Seek(0, io.SeekEnd) @@ -1228,7 +1172,7 @@ func (vlog *valueLog) Close() error { err = munmapErr } - maxFid := vlog.maxFid + maxFid := atomic.LoadUint32(&vlog.maxFid) if !vlog.opt.ReadOnly && id == maxFid { // truncate writable log file to correct offset. if truncErr := f.fd.Truncate( @@ -1321,12 +1265,12 @@ func (reqs requests) IncrRef() { // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32. func (vlog *valueLog) sync(fid uint32) error { - if vlog.opt.SyncWrites || vlog.opt.InMemory { + if vlog.opt.SyncWrites { return nil } vlog.filesLock.RLock() - maxFid := vlog.maxFid + maxFid := atomic.LoadUint32(&vlog.maxFid) // During replay it is possible to get sync call with fid less than maxFid. // Because older file has already been synced, we can return from here. if fid < maxFid || len(vlog.filesMap) == 0 { @@ -1359,7 +1303,7 @@ func (vlog *valueLog) write(reqs []*request) error { return nil } vlog.filesLock.RLock() - maxFid := vlog.maxFid + maxFid := atomic.LoadUint32(&vlog.maxFid) curlf := vlog.filesMap[maxFid] vlog.filesLock.RUnlock() @@ -1391,7 +1335,7 @@ func (vlog *valueLog) write(reqs []*request) error { return err } - newid := vlog.maxFid + 1 + newid := atomic.AddUint32(&vlog.maxFid, 1) y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid) newlf, err := vlog.createVlogFile(newid) if err != nil { @@ -1452,26 +1396,14 @@ func (vlog *valueLog) write(reqs []*request) error { // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file // (if non-nil) -func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) { +func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() - ret, ok := vlog.filesMap[vp.Fid] + ret, ok := vlog.filesMap[fid] if !ok { // log file has gone away, will need to retry the operation. return nil, ErrRetry } - - // Check for valid offset if we are reading from writable log. - maxFid := vlog.maxFid - if vp.Fid == maxFid { - currentOffset := vlog.woffset() - if vp.Offset >= currentOffset { - return nil, errors.Errorf( - "Invalid value pointer offset: %d greater than current offset: %d", - vp.Offset, currentOffset) - } - } - ret.lock.RLock() return ret, nil } @@ -1479,6 +1411,13 @@ func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) { // Read reads the value log at a given location. // TODO: Make this read private. func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) { + // Check for valid offset if we are reading from writable log. + maxFid := atomic.LoadUint32(&vlog.maxFid) + if vp.Fid == maxFid && vp.Offset >= vlog.woffset() { + return nil, nil, errors.Errorf( + "Invalid value pointer offset: %d greater than current offset: %d", + vp.Offset, vlog.woffset()) + } buf, lf, err := vlog.readValueBytes(vp, s) // log file is locked so, decide whether to lock immediately or let the caller to // unlock it, after caller uses it. @@ -1528,11 +1467,10 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() { // readValueBytes return vlog entry slice and read locked log file. Caller should take care of // logFile unlocking. func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) { - lf, err := vlog.getFileRLocked(vp) + lf, err := vlog.getFileRLocked(vp.Fid) if err != nil { return nil, nil, err } - buf, err := lf.read(vp, s) return buf, lf, err } @@ -1541,11 +1479,10 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() fids := vlog.sortedFids() - switch { - case len(fids) <= 1: + if len(fids) <= 1 { tr.LazyPrintf("Only one or less value log file.") return nil - case head.Fid == 0: + } else if head.Fid == 0 { tr.LazyPrintf("Head pointer is at zero.") return nil } diff --git a/value_test.go b/value_test.go index 8aa7c0909..304ac7d41 100644 --- a/value_test.go +++ b/value_test.go @@ -780,9 +780,6 @@ func TestPenultimateLogCorruption(t *testing.T) { if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) } - require.NoError(t, db0.vlog.Close()) - require.NoError(t, db0.manifest.close()) - require.NoError(t, db0.registry.Close()) opt.Truncate = true db1, err := Open(opt) @@ -802,9 +799,7 @@ func TestPenultimateLogCorruption(t *testing.T) { func checkKeys(t *testing.T, kv *DB, keys [][]byte) { i := 0 txn := kv.NewTransaction(false) - defer txn.Discard() iter := txn.NewIterator(IteratorOptions{}) - defer iter.Close() for iter.Seek(keys[0]); iter.Valid(); iter.Next() { require.Equal(t, iter.Item().Key(), keys[i]) i++ diff --git a/y/y_test.go b/y/y_test.go index d1b963184..168da889b 100644 --- a/y/y_test.go +++ b/y/y_test.go @@ -176,7 +176,7 @@ func TestPagebufferReader2(t *testing.T) { require.Equal(t, n, 10, "length of buffer and length written should be equal") require.NoError(t, err, "unable to write bytes to buffer") - randOffset := int(rand.Int31n(int32(b.length) - 1)) + randOffset := int(rand.Int31n(int32(b.length))) randLength := int(rand.Int31n(int32(b.length - randOffset))) reader := b.NewReaderAt(randOffset) // Read randLength bytes.