From 8fd574033532c74cab9e34f7c3e4c1a6af3b1db8 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 6 Mar 2020 13:41:11 +0530 Subject: [PATCH] Add missing commits to release/v2.0 branch (#1245) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Cast sz to uint32 to fix compilation on 32 bit (#1175) `env GOOS=linux GOARCH=arm GOARM=7 go build` no longer fails with overflow. Similar to commit fb0cdb858c78004b9db8f08e91bb713780b20445. Signed-off-by: Christian Stewart (cherry picked from commit 465f28aba5fee9a4dac132ccbebaaae6a1d711dd) * Fix commit sha for WithInMemory in CHANGELOG. (#1172) (cherry picked from commit 03af216ff00a2840b5edd0f34e433d9bccb4728d) * Fix windows build (#1177) Fix windows build and some deepsource warnings (cherry picked from commit 0f2e6297a34efdb07001eba8531ed85e8b42189e) * Fix checkOverlap in compaction (#1166) The overlap check in compaction would keep additional keys in case the levels under compaction had overlap amongst themselves. This commit fixes it. This commit also fixes the `numVersionsToKeep` check. Without this commit, we would end up keeping `2` versions of a key even when the number of versions to keep was set to `1`. See `level 0 to level 1 with lower overlap` test. Fixes https://github.com/dgraph-io/badger/issues/1053 The following test fails on master but works with this commit ```go func TestCompaction(t *testing.T) { t.Run("level 0 to level 1", func(t *testing.T) { dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) defer removeDir(dir) // Disable compactions and keep single version of each key. opt := DefaultOptions(dir).WithNumCompactors(0).WithNumVersionsToKeep(1) db, err := OpenManaged(opt) require.NoError(t, err) l0 := []keyValVersion{{"foo", "bar", 3}, {"fooz", "baz", 1}} l01 := []keyValVersion{{"foo", "bar", 2}} l1 := []keyValVersion{{"foo", "bar", 1}} // Level 0 has table l0 and l01. createAndOpen(db, l0, 0) createAndOpen(db, l01, 0) // Level 1 has table l1. createAndOpen(db, l1, 1) // Set a high discard timestamp so that all the keys are below the discard timestamp. db.SetDiscardTs(10) getAllAndCheck(t, db, []keyValVersion{ {"foo", "bar", 3}, {"foo", "bar", 2}, {"foo", "bar", 1}, {"fooz", "baz", 1}, }) cdef := compactDef{ thisLevel: db.lc.levels[0], nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, } require.NoError(t, db.lc.runCompactDef(0, cdef)) // foo version 2 should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3}, {"fooz", "baz", 1}}) }) } ``` (cherry picked from commit 0a0617359d31a70c2137ef5e27f82ee5bb43f6cd) * Remove the 'this entry should've caught' log from value.go (#1170) Fixes - https://github.com/dgraph-io/badger/issues/1031 (There wasn't a bug to fix. The log statement shouldn't have been there) This PR removes the warning message `WARNING: This entry should have been caught.`. The warning message assumed that we will always find the **newest key if two keys have the same version** This assumption is valid in case of a normal key but it's **NOT TRUE** in case of **move keys**. Here's how we can end up fetching the older version of a move key if two move keys have the same version. ``` It might be possible that the entry read from LSM Tree points to an older vlog file. This can happen in the following situation. Assume DB is opened with numberOfVersionsToKeep=1 Now, if we have ONLY one key in the system "FOO" which has been updated 3 times and the same key has been garbage collected 3 times, we'll have 3 versions of the movekey for the same key "FOO". NOTE: moveKeyi is the moveKey with version i Assume we have 3 move keys in L0. - moveKey1 (points to vlog file 10), - moveKey2 (points to vlog file 14) and - moveKey3 (points to vlog file 15). Also, assume there is another move key "moveKey1" (points to vlog file 6) (this is also a move Key for key "FOO" ) on upper levels (let's say level 3). The move key "moveKey1" on level 0 was inserted because vlog file 6 was GCed. Here's what the arrangement looks like L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15) L1 => .... L2 => .... L3 => (moveKey1 => vlog6) When L0 compaction runs, it keeps only moveKey3 because the number of versions to keep is set to 1. (we've dropped moveKey1's latest version) The new arrangement of keys is L0 => .... L1 => (moveKey3 => vlog15) L2 => .... L3 => (moveKey1 => vlog6) Now if we try to GC vlog file 10, the entry read from vlog file will point to vlog10 but the entry read from LSM Tree will point to vlog6. The move key read from LSM tree will point to vlog6 because we've asked for version 1 of the move key. This might seem like an issue but it's not really an issue because the user has set the number of versions to keep to 1 and the latest version of moveKey points to the correct vlog file and offset. The stale move key on L3 will be eventually dropped by compaction because there is a newer version in the upper levels. ``` (cherry picked from commit 2a90c665f1e57fdc48256f222ed5d826c554043c) * Avoid sync in inmemory mode (#1190) This makes db.Sync() no-op when badger is running in in-memory mode. The previous code would unnecessarily load up an atomic and acquire locks. (cherry picked from commit 2698bfc11d79feddf9515b1c8e9c25d3b5fba1d0) * Use fastRand instead of locked-rand in skiplist (#1173) The math/rand package (https://golang.org/src/math/rand/rand.go) uses a global lock to allow concurrent access to the rand object. The PR replaces `math.Rand` with `ristretto/z.FastRand()`. `FastRand` is much faster than `math.Rand` and `rand.New(..)` generators. The change improves concurrent writes to skiplist by ~30% ```go func BenchmarkWrite(b *testing.B) { value := newValue(123) l := NewSkiplist(int64((b.N + 1) * MaxNodeSize)) defer l.DecrRef() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { rng := rand.New(rand.NewSource(time.Now().UnixNano())) for pb.Next() { l.Put(randomKey(rng), y.ValueStruct{Value: value, Meta: 0, UserMeta: 0}) } }) } ``` ``` name old time/op new time/op delta Write-16 657ns ± 3% 441ns ± 1% -32.94% (p=0.000 n=9+10) ``` (cherry picked from commit 9d6512b081d0c172f5e5153b289f2c97458da01c) * Add Jaegar to list of projects (#1192) (cherry picked from commit 01a00cb4e6e39ece7ebff96217ac9ee8e128597f) * Run all tests on CI (#1189) This could possibly be a bug in `go test` command https://github.com/golang/go/issues/36527 . The `go test` command would skip tests in sub-packages if the top-level package has a `custom flag` defined and the sub-packages don't define it. Issue https://github.com/golang/go/issues/36527#issue-548887632 has an example of this. This PR also removes the code from the test that would unnecessary start a web server. I see two problems here 1. An unnecessary web server running. 2. We cannot run multiple tests are the same time since the second run of the test would try to start a web server and crash saying `port already in use`. (cherry picked from commit 5870b7b1975e031549f8a3d4309335f6b03f0fa4) * Improve write stalling on level 0 and 1 We don't need to stall writes if Level 1 does not have enough space. Level 1 is stored on the disk and it should be okay to have more number of tables (more size) on Level 1 than the `max level 1 size`. These tables will eventually be compacted to lower levels. This commit changes the following - We no longer stall writes if L1 doesn't have enough space. - We stall writes on level 0 only if `KeepL0InMemory` is true. - Upper levels (L0, L1, etc) get priority in compaction (previously, level with higher priority score would get preference) (cherry picked from commit 3747be59ebbbf34382ab8d84207151c7f7014da2) * Update ristretto to version 8f368f2 (#1195) This commit pulls following changes from ristretto ``` git log c1f00be0418e...8f368f2 --oneline ``` ``` 8f368f2 (HEAD -> master) Fix DeepSource warnings adb35f0 delete item immediately fce5e91 Support nil *Cache values in Clear and Close 4e224f9 Add .travis.yml 8350826 Fix the way metrics are handled for deletions 99d1bbb (tag: v0.0.1) default to 128bit hashing for collision checks 1eea1b1 atomic Get-Delete operation for eviction 8d6a8a7 add cache test for identifying MaxCost overflows ae09250 use the custom KeyToHash function if one is set 1dd5a4d #19 Fixes memory leak due to Oct 1st regression in processItems ``` (cherry picked from commit 82381aceeed606625256e47b877edd6eef5e4011) * Support disabling the cache completely. (#1183) (#1185) The cache can be disabled by setting `opt.MaxCacheSize=0` (cherry picked from commit 7e5a9568cfb85ef52d18ecb8ebff78183e1f6f08) * Fix L0/L1 stall test (#1201) The test `TestL0Stall` and `TestL1Stall` would never fail because of errors in the manifest file. This commit fixes it. (cherry picked from commit 0acb3f6f3d342703f53a56a7e69d310390b9c9ac) * Disable compression and set ZSTD Compression Level to 1 (#1191) This PR - Disables compression. By default, badger does not use any compression. - Set default ZSTD compression level to 1 Level 15 is very slow for any practical use of badger. ``` no_compression-16 10 502848865 ns/op 165.46 MB/s zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s ``` (cherry picked from commit c3333a5a830e29133f3b7d63ecf44b54d01e89b2) * Add support for caching bloomfilters (#1204) This PR adds support for caching bloom filters in ristretto. The bloom filters and blocks are removed from the cache when the table is deleted. (cherry picked from commit 4676ca96f15b9e9a3b1bc163b1a2a908bde553f0) * Rework concurrency semantics of valueLog.maxFid (#1184) (#1187) Move all access to `valueLog.maxFid` under `valueLog.filesLock`, while all mutations happen either with writes stopped or sequentially under valueLog.write. Fixes a concurrency issue in `valueLog.Read` where the maxFid variable and the `writableLogOffset` variable could point to two different log files. (cherry picked from commit 3e25d771067d933c88674f26e5a11ef75c492e90) * Change else-if statements to idiomatic switch statements. (#1207) (cherry picked from commit eee160218ee7fe51a25aca6ed4890fb5682bdaca) * Fix flaky TestPageBufferReader2 test (#1210) Fixes https://github.com/dgraph-io/badger/issues/1197 The `TestPageBufferReader2` test would fail often because of an `off-by-1` issue. The problem can be reproduced by setting `randOffset` to the biggest number that randInt31n may return statically like: ``` //randOffset := int(rand.Int31n(int32(b.length))) randOffset := int(int32(b.length-1)) ``` This makes the problem reliably reproducible as the offset is now pointing at EOF. Thus changing the line to this should hopefully solve the problem: `randOffset := int(rand.Int31n(int32(b.length-1 (cherry picked from commit c51748efdced3b41abb111df288c806dff2c242f) * Replace t.Fatal with require.NoError in tests (#1213) We are using the following pattern in tests that can be replaced with `require.NoError(t, err)`. ```go if err != nil { t.Fatal(err) } ``` (cherry picked from commit 78d405a5b2616d4a14b902f4a3094fecdf745819) * Add missing package to README for badger.NewEntry (#1223) (cherry picked from commit 8734e3a53a563d6dd2e7f4cdd74c4d113498c6dd) * Remove ExampleDB_Subscribe Test (#1214) The test ExampleDB_Subscribe doesn't run reliably on appveyor. This commit removes it. (cherry picked from commit e029e93f52c1930934c19c28bb378744858d33b3) * Fix int overflow for 32bit (#1216) - Fix tests for 32 bit systems - Enable 32-bit builds on Travis (cherry picked from commit bce069c3875ec704fee689857d47d241199daf46) * Update CHANGELOG for Badger 2.0.2 release. (#1230) Co-authored-by: Ibrahim Jarif (cherry picked from commit e908818da900fec1c394bf599fcc7cff71bf8054) * Fix changelog for v2.0.2 (cherry picked from commit b81faa561c6dc912cfd22a363524876dad46d664) Co-authored-by: Christian Stewart Co-authored-by: Leyla Galatin <56415703+lgalatin@users.noreply.github.com> Co-authored-by: Damien Tournoud Co-authored-by: Martin Martinez Rivera Co-authored-by: Dieter Plaetinck Co-authored-by: Apoorv Vardhan --- .travis.yml | 32 ++- CHANGELOG.md | 23 +- README.md | 9 +- backup_test.go | 106 ++++---- db.go | 51 ++-- db2_test.go | 38 ++- db_test.go | 114 ++------ go.mod | 2 +- go.sum | 4 +- level_handler.go | 4 +- levels.go | 114 ++++---- levels_test.go | 567 ++++++++++++++++++++++++++++++++++++++++ manifest_test.go | 1 + options.go | 43 +-- skl/skl.go | 8 +- skl/skl_test.go | 15 +- table/builder_test.go | 46 +++- table/merge_iterator.go | 29 +- table/table.go | 97 ++++++- table/table_test.go | 13 +- test.sh | 31 ++- txn_test.go | 27 +- value.go | 145 +++++++--- value_test.go | 5 + y/y_test.go | 2 +- 25 files changed, 1143 insertions(+), 383 deletions(-) create mode 100644 levels_test.go diff --git a/.travis.yml b/.travis.yml index 5439132bc..825bad7ca 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,26 +1,38 @@ language: go go: - - "1.11" - - "1.12" - - "1.13" - - tip + - "1.11" + - "1.12" + - "1.13" + - tip os: - - osx - + - osx +env: + jobs: + - GOARCH=386 + - GOARCH=amd64 + global: + - secure: CRkV2+/jlO0gXzzS50XGxfMS117FNwiVjxNY/LeWq06RKD+dDCPxTJl3JCNe3l0cYEPAglV2uMMYukDiTqJ7e+HI4nh4N4mv6lwx39N8dAvJe1x5ITS2T4qk4kTjuQb1Q1vw/ZOxoQqmvNKj2uRmBdJ/HHmysbRJ1OzCWML3OXdUwJf0AYlJzTjpMfkOKr7sTtE4rwyyQtd4tKH1fGdurgI9ZuFd9qvYxK2qcJhsQ6CNqMXt+7FkVkN1rIPmofjjBTNryzUr4COFXuWH95aDAif19DeBW4lbNgo1+FpDsrgmqtuhl6NAuptI8q/imow2KXBYJ8JPXsxW8DVFj0IIp0RCd3GjaEnwBEbxAyiIHLfW7AudyTS/dJOvZffPqXnuJ8xj3OPIdNe4xY0hWl8Ju2HhKfLOAHq7VadHZWd3IHLil70EiL4/JLD1rNbMImUZisFaA8pyrcIvYYebjOnk4TscwKFLedClRSX1XsMjWWd0oykQtrdkHM2IxknnBpaLu7mFnfE07f6dkG0nlpyu4SCLey7hr5FdcEmljA0nIxTSYDg6035fQkBEAbe7hlESOekkVNT9IZPwG+lmt3vU4ofi6NqNbJecOuSB+h36IiZ9s4YQtxYNnLgW14zjuFGGyT5smc3IjBT7qngDjKIgyrSVoRkY/8udy9qbUgvBeW8= + + jobs: allow_failures: - go: tip + exclude: + # Exclude builds for 386 architecture on go 1.11, 1.12 and tip + # Since we don't want it to run for 32 bit + - go: "1.11" + env: GOARCH=386 + - go: "1.12" + env: GOARCH=386 + - go: tip + env: GOARCH=386 notifications: email: false slack: secure: X7uBLWYbuUhf8QFE16CoS5z7WvFR8EN9j6cEectMW6mKZ3vwXGwVXRIPsgUq/606DsQdCCx34MR8MRWYGlu6TBolbSe9y0EP0i46yipPz22YtuT7umcVUbGEyx8MZKgG0v1u/zA0O4aCsOBpGAA3gxz8h3JlEHDt+hv6U8xRsSllVLzLSNb5lwxDtcfEDxVVqP47GMEgjLPM28Pyt5qwjk7o5a4YSVzkfdxBXxd3gWzFUWzJ5E3cTacli50dK4GVfiLcQY2aQYoYO7AAvDnvP+TPfjDkBlUEE4MUz5CDIN51Xb+WW33sX7g+r3Bj7V5IRcF973RiYkpEh+3eoiPnyWyxhDZBYilty3b+Hysp6d4Ov/3I3ll7Bcny5+cYjakjkMH3l9w3gs6Y82GlpSLSJshKWS8vPRsxFe0Pstj6QSJXTd9EBaFr+l1ScXjJv/Sya9j8N9FfTuOTESWuaL1auX4Y7zEEVHlA8SCNOO8K0eTfxGZnC/YcIHsR8rePEAcFxfOYQppkyLF/XvAtnb/LMUuu0g4y2qNdme6Oelvyar1tFEMRtbl4mRCdu/krXBFtkrsfUaVY6WTPdvXAGotsFJ0wuA53zGVhlcd3+xAlSlR3c1QX95HIMeivJKb5L4nTjP+xnrmQNtnVk+tG4LSH2ltuwcZSSczModtcBmRefrk= -env: - global: - - secure: CRkV2+/jlO0gXzzS50XGxfMS117FNwiVjxNY/LeWq06RKD+dDCPxTJl3JCNe3l0cYEPAglV2uMMYukDiTqJ7e+HI4nh4N4mv6lwx39N8dAvJe1x5ITS2T4qk4kTjuQb1Q1vw/ZOxoQqmvNKj2uRmBdJ/HHmysbRJ1OzCWML3OXdUwJf0AYlJzTjpMfkOKr7sTtE4rwyyQtd4tKH1fGdurgI9ZuFd9qvYxK2qcJhsQ6CNqMXt+7FkVkN1rIPmofjjBTNryzUr4COFXuWH95aDAif19DeBW4lbNgo1+FpDsrgmqtuhl6NAuptI8q/imow2KXBYJ8JPXsxW8DVFj0IIp0RCd3GjaEnwBEbxAyiIHLfW7AudyTS/dJOvZffPqXnuJ8xj3OPIdNe4xY0hWl8Ju2HhKfLOAHq7VadHZWd3IHLil70EiL4/JLD1rNbMImUZisFaA8pyrcIvYYebjOnk4TscwKFLedClRSX1XsMjWWd0oykQtrdkHM2IxknnBpaLu7mFnfE07f6dkG0nlpyu4SCLey7hr5FdcEmljA0nIxTSYDg6035fQkBEAbe7hlESOekkVNT9IZPwG+lmt3vU4ofi6NqNbJecOuSB+h36IiZ9s4YQtxYNnLgW14zjuFGGyT5smc3IjBT7qngDjKIgyrSVoRkY/8udy9qbUgvBeW8= - before_script: - go get github.com/mattn/goveralls script: diff --git a/CHANGELOG.md b/CHANGELOG.md index 6408ebcdc..1115ef506 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,31 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Serialization Versioning](VERSIONING.md). +## [2.0.2] - 2020-02-26 + +### Fixed + +- Cast sz to uint32 to fix compilation on 32 bit. (#1175) +- Fix checkOverlap in compaction. (#1166) +- Avoid sync in inmemory mode. (#1190) +- Support disabling the cache completely. (#1185) +- Add support for caching bloomfilters. (#1204) +- Fix int overflow for 32bit. (#1216) +- Remove the 'this entry should've caught' log from value.go. (#1170) +- Rework concurrency semantics of valueLog.maxFid. (#1187) + +### Performance + +- Use fastRand instead of locked-rand in skiplist. (#1173) +- Improve write stalling on level 0 and 1. (#1186) +- Disable compression and set ZSTD Compression Level to 1. (#1191) + ## [2.0.1] - 2020-01-02 ### New APIs - badger.Options - - WithInMemory (5b6321) + - WithInMemory (f5b6321) - WithZSTDCompressionLevel (3eb4e72) - Badger.TableInfo @@ -274,7 +293,7 @@ Bug fix: ## [1.0.1] - 2017-11-06 * Fix an uint16 overflow when resizing key slice -[Unreleased]: https://github.com/dgraph-io/badger/compare/v2.0.1...HEAD +[2.0.2]: https://github.com/dgraph-io/badger/compare/v2.0.1...v2.0.2 [2.0.1]: https://github.com/dgraph-io/badger/compare/v2.0.0...v2.0.1 [2.0.0]: https://github.com/dgraph-io/badger/compare/v1.6.0...v2.0.0 [1.6.0]: https://github.com/dgraph-io/badger/compare/v1.5.5...v1.6.0 diff --git a/README.md b/README.md index a3064653c..25fc2a4e9 100644 --- a/README.md +++ b/README.md @@ -252,7 +252,7 @@ on it. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")) + e := badger.NewEntry([]byte("answer"), []byte("42")) err := txn.SetEntry(e) return err }) @@ -401,7 +401,7 @@ and `Txn.SetEntry()` API methods. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) + e := badger.NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) err := txn.SetEntry(e) return err }) @@ -414,7 +414,7 @@ metadata can be set using `Entry.WithMeta()` and `Txn.SetEntry()` API methods. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) + e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) err := txn.SetEntry(e) return err }) @@ -425,7 +425,7 @@ then can be set using `Txn.SetEntry()`. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) + e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) err := txn.SetEntry(e) return err }) @@ -748,6 +748,7 @@ Below is a list of known projects that use Badger: * [0-stor](https://github.com/zero-os/0-stor) - Single device object store. * [Dgraph](https://github.com/dgraph-io/dgraph) - Distributed graph database. +* [Jaeger](https://github.com/jaegertracing/jaeger) - Distributed tracing platform. * [TalariaDB](https://github.com/grab/talaria) - Distributed, low latency time-series database. * [Dispatch Protocol](https://github.com/dispatchlabs/disgo) - Blockchain protocol for distributed application data analytics. * [Sandglass](https://github.com/celrenheit/sandglass) - distributed, horizontally scalable, persistent, time sorted message queue. diff --git a/backup_test.go b/backup_test.go index 16ef8ed5c..b652c89e3 100644 --- a/backup_test.go +++ b/backup_test.go @@ -116,9 +116,8 @@ func TestBackupRestore1(t *testing.T) { func TestBackupRestore2(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) s1Path := filepath.Join(tmpdir, "test1") @@ -126,9 +125,9 @@ func TestBackupRestore2(t *testing.T) { s3Path := filepath.Join(tmpdir, "test3") db1, err := Open(getTestOptions(s1Path)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db1.Close() key1 := []byte("key1") key2 := []byte("key2") rawValue := []byte("NotLongValue") @@ -139,9 +138,8 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(key2, rawValue)) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + for i := byte(1); i < N; i++ { err = db1.Update(func(tx *Txn) error { if err := tx.SetEntry(NewEntry(append(key1, i), rawValue)); err != nil { @@ -149,25 +147,21 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(append(key2, i), rawValue)) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } var backup bytes.Buffer _, err = db1.Backup(&backup, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + fmt.Println("backup1 length:", backup.Len()) db2, err := Open(getTestOptions(s2Path)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db2.Close() err = db2.Load(&backup, 16) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := byte(1); i < N; i++ { err = db2.View(func(tx *Txn) error { @@ -188,9 +182,8 @@ func TestBackupRestore2(t *testing.T) { } return nil }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } for i := byte(1); i < N; i++ { @@ -200,26 +193,22 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(append(key2, i), rawValue)) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } backup.Reset() _, err = db2.Backup(&backup, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + fmt.Println("backup2 length:", backup.Len()) db3, err := Open(getTestOptions(s3Path)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db3.Close() err = db3.Load(&backup, 16) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := byte(1); i < N; i++ { err = db3.View(func(tx *Txn) error { @@ -240,9 +229,8 @@ func TestBackupRestore2(t *testing.T) { } return nil }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } } @@ -310,9 +298,8 @@ func TestBackup(t *testing.T) { } t.Run("disk mode", func(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) opt := DefaultOptions(filepath.Join(tmpdir, "backup0")) runBadgerTest(t, &opt, func(t *testing.T, db *DB) { @@ -330,11 +317,9 @@ func TestBackup(t *testing.T) { func TestBackupRestore3(t *testing.T) { var bb bytes.Buffer - tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) N := 1000 @@ -343,10 +328,9 @@ func TestBackupRestore3(t *testing.T) { // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer db1.Close() require.NoError(t, populateEntries(db1, entries)) _, err = db1.Backup(&bb, 0) @@ -358,9 +342,9 @@ func TestBackupRestore3(t *testing.T) { // restore db2, err := Open(DefaultOptions(filepath.Join(tmpdir, "restore1"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db2.Close() require.NoError(t, db2.Load(&bb, 16)) // verify @@ -390,9 +374,8 @@ func TestBackupRestore3(t *testing.T) { func TestBackupLoadIncremental(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) N := 100 @@ -403,9 +386,9 @@ func TestBackupLoadIncremental(t *testing.T) { // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db1.Close() require.NoError(t, populateEntries(db1, entries)) since, err := db1.Backup(&bb, 0) @@ -463,9 +446,10 @@ func TestBackupLoadIncremental(t *testing.T) { // restore db2, err := Open(getTestOptions(filepath.Join(tmpdir, "restore2"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db2.Close() + require.NoError(t, db2.Load(&bb, 16)) // verify diff --git a/db.go b/db.go index 4cffe9cea..a40aa8d92 100644 --- a/db.go +++ b/db.go @@ -146,7 +146,8 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { ExpiresAt: e.ExpiresAt, } - if e.meta&bitFinTxn > 0 { + switch { + case e.meta&bitFinTxn > 0: txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil { return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value) @@ -160,7 +161,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { txn = txn[:0] lastCommit = 0 - } else if e.meta&bitTxn > 0 { + case e.meta&bitTxn > 0: txnTs := y.ParseTs(nk) if lastCommit == 0 { lastCommit = txnTs @@ -174,7 +175,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { te := txnEntry{nk: nk, v: v} txn = append(txn, te) - } else { + default: // This entry is from a rewrite. toLSM(nk, v) @@ -278,17 +279,6 @@ func Open(opt Options) (db *DB, err error) { elog = trace.NewEventLog("Badger", "DB") } - config := ristretto.Config{ - // Use 5% of cache memory for storing counters. - NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), - MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), - BufferItems: 64, - Metrics: true, - } - cache, err := ristretto.NewCache(&config) - if err != nil { - return nil, errors.Wrap(err, "failed to create cache") - } db = &DB{ imm: make([]*skl.Skiplist, 0, opt.NumMemtables), flushChan: make(chan flushTask, opt.NumMemtables), @@ -300,7 +290,20 @@ func Open(opt Options) (db *DB, err error) { valueDirGuard: valueDirLockGuard, orc: newOracle(opt), pub: newPublisher(), - blockCache: cache, + } + + if opt.MaxCacheSize > 0 { + config := ristretto.Config{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), + MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), + BufferItems: 64, + Metrics: true, + } + db.blockCache, err = ristretto.NewCache(&config) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache") + } } if db.opt.InMemory { @@ -386,7 +389,10 @@ func Open(opt Options) (db *DB, err error) { // CacheMetrics returns the metrics for the underlying cache. func (db *DB) CacheMetrics() *ristretto.Metrics { - return db.blockCache.Metrics + if db.blockCache != nil { + return db.blockCache.Metrics + } + return nil } // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to @@ -1050,9 +1056,10 @@ func (db *DB) calculateSize() { return err } ext := filepath.Ext(path) - if ext == ".sst" { + switch ext { + case ".sst": lsmSize += info.Size() - } else if ext == ".vlog" { + case ".vlog": vlogSize += info.Size() } return nil @@ -1209,11 +1216,12 @@ func (seq *Sequence) Release() error { func (seq *Sequence) updateLease() error { return seq.db.Update(func(txn *Txn) error { item, err := txn.Get(seq.key) - if err == ErrKeyNotFound { + switch { + case err == ErrKeyNotFound: seq.next = 0 - } else if err != nil { + case err != nil: return err - } else { + default: var num uint64 if err := item.Value(func(v []byte) error { num = binary.BigEndian.Uint64(v) @@ -1501,6 +1509,7 @@ func (db *DB) dropAll() (func(), error) { db.lc.nextFileID = 1 db.opt.Infof("Deleted %d value log files. DropAll done.\n", num) db.blockCache.Clear() + return resume, nil } diff --git a/db2_test.go b/db2_test.go index 3885f9aae..56d9ca145 100644 --- a/db2_test.go +++ b/db2_test.go @@ -23,6 +23,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "math/rand" "os" "path" @@ -310,7 +311,7 @@ func TestPushValueLogLimit(t *testing.T) { for i := 0; i < 32; i++ { if i == 4 { - v := make([]byte, 2<<30) + v := make([]byte, math.MaxInt32) err := db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte(key(i)), v)) }) @@ -627,22 +628,29 @@ func TestL0GCBug(t *testing.T) { return []byte(fmt.Sprintf("%10d", i)) } val := []byte{1, 1, 1, 1, 1, 1, 1, 1} - // Insert 100 entries. This will create about 50 vlog files and 2 SST files. - for i := 0; i < 100; i++ { - err = db1.Update(func(txn *Txn) error { - return txn.SetEntry(NewEntry(key(i), val)) - }) - require.NoError(t, err) + // Insert 100 entries. This will create about 50*3 vlog files and 6 SST files. + for i := 0; i < 3; i++ { + for j := 0; j < 100; j++ { + err = db1.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key(j), val)) + }) + require.NoError(t, err) + } } // Run value log GC multiple times. This would ensure at least // one value log file is garbage collected. + success := 0 for i := 0; i < 10; i++ { err := db1.RunValueLogGC(0.01) + if err == nil { + success++ + } if err != nil && err != ErrNoRewrite { t.Fatalf(err.Error()) } } - + // Ensure alteast one GC call was successful. + require.NotZero(t, success) // CheckKeys reads all the keys previously stored. checkKeys := func(db *DB) { for i := 0; i < 100; i++ { @@ -665,7 +673,12 @@ func TestL0GCBug(t *testing.T) { if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) } - require.NoError(t, db1.vlog.Close()) + for _, f := range db1.vlog.filesMap { + require.NoError(t, f.fd.Close()) + } + require.NoError(t, db1.registry.Close()) + require.NoError(t, db1.lc.close()) + require.NoError(t, db1.manifest.close()) db2, err := Open(opts) require.NoError(t, err) @@ -723,7 +736,6 @@ func TestWindowsDataLoss(t *testing.T) { opt.Truncate = true db, err = Open(opt) require.NoError(t, err) - // Return after reading one entry. We're simulating a crash. // Simulate a crash by not closing db but releasing the locks. if db.dirLockGuard != nil { @@ -735,6 +747,12 @@ func TestWindowsDataLoss(t *testing.T) { // Don't use vlog.Close here. We don't want to fix the file size. Only un-mmap // the data so that we can truncate the file durning the next vlog.Open. require.NoError(t, y.Munmap(db.vlog.filesMap[db.vlog.maxFid].fmap)) + for _, f := range db.vlog.filesMap { + require.NoError(t, f.fd.Close()) + } + require.NoError(t, db.registry.Close()) + require.NoError(t, db.manifest.close()) + require.NoError(t, db.lc.close()) fmt.Println() fmt.Println("Third DB Open") diff --git a/db_test.go b/db_test.go index 03ea090dd..59781511a 100644 --- a/db_test.go +++ b/db_test.go @@ -23,10 +23,8 @@ import ( "flag" "fmt" "io/ioutil" - "log" "math" "math/rand" - "net/http" "os" "path/filepath" "runtime" @@ -289,6 +287,13 @@ func TestGet(t *testing.T) { test(t, db) require.NoError(t, db.Close()) }) + t.Run("cache disabled", func(t *testing.T) { + opts := DefaultOptions("").WithInMemory(true).WithMaxCacheSize(0) + db, err := Open(opts) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) + }) } func TestGetAfterDelete(t *testing.T) { @@ -1161,6 +1166,9 @@ func TestExpiryImproperDBClose(t *testing.T) { // it would return Truncate Required Error. require.NoError(t, db0.vlog.Close()) + require.NoError(t, db0.registry.Close()) + require.NoError(t, db0.manifest.close()) + db1, err := Open(opt) require.NoError(t, err) err = db1.View(func(txn *Txn) error { @@ -1200,7 +1208,7 @@ func randBytes(n int) []byte { recv := make([]byte, n) in, err := rand.Read(recv) if err != nil { - log.Fatal(err) + panic(err) } return recv[:in] } @@ -1558,9 +1566,6 @@ func TestLSMOnly(t *testing.T) { opts.ValueLogMaxEntries = 100 db, err := Open(opts) require.NoError(t, err) - if err != nil { - t.Fatal(err) - } value := make([]byte, 128) _, err = rand.Read(value) @@ -1572,9 +1577,7 @@ func TestLSMOnly(t *testing.T) { db, err = Open(opts) require.NoError(t, err) - if err != nil { - t.Fatal(err) - } + defer db.Close() require.NoError(t, db.RunValueLogGC(0.2)) } @@ -1670,12 +1673,12 @@ func TestGoroutineLeak(t *testing.T) { func ExampleOpen() { dir, err := ioutil.TempDir("", "badger-test") if err != nil { - log.Fatal(err) + panic(err) } defer removeDir(dir) db, err := Open(DefaultOptions(dir)) if err != nil { - log.Fatal(err) + panic(err) } defer db.Close() @@ -1687,17 +1690,17 @@ func ExampleOpen() { }) if err != nil { - log.Fatal(err) + panic(err) } txn := db.NewTransaction(true) // Read-write txn err = txn.SetEntry(NewEntry([]byte("key"), []byte("value"))) if err != nil { - log.Fatal(err) + panic(err) } err = txn.Commit() if err != nil { - log.Fatal(err) + panic(err) } err = db.View(func(txn *Txn) error { @@ -1714,7 +1717,7 @@ func ExampleOpen() { }) if err != nil { - log.Fatal(err) + panic(err) } // Output: @@ -1725,13 +1728,13 @@ func ExampleOpen() { func ExampleTxn_NewIterator() { dir, err := ioutil.TempDir("", "badger-test") if err != nil { - log.Fatal(err) + panic(err) } defer removeDir(dir) db, err := Open(DefaultOptions(dir)) if err != nil { - log.Fatal(err) + panic(err) } defer db.Close() @@ -1749,13 +1752,13 @@ func ExampleTxn_NewIterator() { for i := 0; i < n; i++ { err := txn.SetEntry(NewEntry(bkey(i), bval(i))) if err != nil { - log.Fatal(err) + panic(err) } } err = txn.Commit() if err != nil { - log.Fatal(err) + panic(err) } opt := DefaultIteratorOptions @@ -1772,7 +1775,7 @@ func ExampleTxn_NewIterator() { return nil }) if err != nil { - log.Fatal(err) + panic(err) } fmt.Printf("Counted %d elements", count) // Output: @@ -1954,79 +1957,10 @@ func TestVerifyChecksum(t *testing.T) { } func TestMain(m *testing.M) { - // call flag.Parse() here if TestMain uses flags - go func() { - if err := http.ListenAndServe("localhost:8080", nil); err != nil { - log.Fatalf("Unable to open http port at 8080") - } - }() + flag.Parse() os.Exit(m.Run()) } -func ExampleDB_Subscribe() { - prefix := []byte{'a'} - - // This key should be printed, since it matches the prefix. - aKey := []byte("a-key") - aValue := []byte("a-value") - - // This key should not be printed. - bKey := []byte("b-key") - bValue := []byte("b-value") - - // Open the DB. - dir, err := ioutil.TempDir("", "badger-test") - if err != nil { - log.Fatal(err) - } - defer removeDir(dir) - db, err := Open(DefaultOptions(dir)) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - // Create the context here so we can cancel it after sending the writes. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Use the WaitGroup to make sure we wait for the subscription to stop before continuing. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - cb := func(kvs *KVList) error { - for _, kv := range kvs.Kv { - fmt.Printf("%s is now set to %s\n", kv.Key, kv.Value) - } - return nil - } - if err := db.Subscribe(ctx, cb, prefix); err != nil && err != context.Canceled { - log.Fatal(err) - } - log.Printf("subscription closed") - }() - - // Wait for the above go routine to be scheduled. - time.Sleep(time.Second) - // Write both keys, but only one should be printed in the Output. - err = db.Update(func(txn *Txn) error { return txn.Set(aKey, aValue) }) - if err != nil { - log.Fatal(err) - } - err = db.Update(func(txn *Txn) error { return txn.Set(bKey, bValue) }) - if err != nil { - log.Fatal(err) - } - - log.Printf("stopping subscription") - cancel() - log.Printf("waiting for subscription to close") - wg.Wait() - // Output: - // a-key is now set to a-value -} - func removeDir(dir string) { if err := os.RemoveAll(dir); err != nil { panic(err) diff --git a/go.mod b/go.mod index f7f1fb0d3..eae04e485 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e + github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index 60e673a75..4c71dbdf4 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e h1:aeUNgwup7PnDOBAD1BOKAqzb/W/NksOj6r3dwKKuqfg= -github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs= +github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= +github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/level_handler.go b/level_handler.go index dbc2532ba..19ba0892b 100644 --- a/level_handler.go +++ b/level_handler.go @@ -188,7 +188,9 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool { // Need lock as we may be deleting the first table during a level 0 compaction. s.Lock() defer s.Unlock() - if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { + // Return false only if L0 is in memory and number of tables is more than number of + // ZeroTableStall. For on disk L0, we should just add the tables to the level. + if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { return false } diff --git a/levels.go b/levels.go index 0a4b92f26..41faf6e0b 100644 --- a/levels.go +++ b/levels.go @@ -44,12 +44,9 @@ type levelsController struct { kv *DB cstatus compactStatus -} - -var ( // This is for getting timings between stalls. lastUnstalled time.Time -) +} // revertToManifest checks that all necessary table files exist and removes all table files not // referenced by the manifest. idMap is a set of table file id's that were read from the directory @@ -87,12 +84,13 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { for i := 0; i < db.opt.MaxLevels; i++ { s.levels[i] = newLevelHandler(db, i) - if i == 0 { + switch i { + case 0: // Do nothing. - } else if i == 1 { + case 1: // Level 1 probably shouldn't be too much bigger than level 0. s.levels[i].maxTotalSize = db.opt.LevelOneSize - } else { + default: s.levels[i].maxTotalSize = s.levels[i-1].maxTotalSize * int64(db.opt.LevelSizeMultiplier) } s.cstatus.levels[i] = new(levelCompactStatus) @@ -363,12 +361,15 @@ func (s *levelsController) runWorker(lc *y.Closer) { // Can add a done channel or other stuff. case <-ticker.C: prios := s.pickCompactLevels() + loop: for _, p := range prios { - if err := s.doCompact(p); err == nil { - break - } else if err == errFillTables { + err := s.doCompact(p) + switch err { + case nil: + break loop + case errFillTables: // pass - } else { + default: s.kv.opt.Warningf("While running doCompact: %v\n", err) } } @@ -424,34 +425,42 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { prios = append(prios, pri) } } - sort.Slice(prios, func(i, j int) bool { - return prios[i].score > prios[j].score - }) + // We used to sort compaction priorities based on the score. But, we + // decided to compact based on the level, not the priority. So, upper + // levels (level 0, level 1, etc) always get compacted first, before the + // lower levels -- this allows us to avoid stalls. return prios } -// compactBuildTables merge topTables and botTables to form a list of new tables. +// checkOverlap checks if the given tables overlap with any level from the given "lev" onwards. +func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool { + kr := getKeyRange(tables...) + for i, lh := range s.levels { + if i < lev { // Skip upper levels. + continue + } + lh.RLock() + left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) + lh.RUnlock() + if right-left > 0 { + return true + } + } + return false +} + +// compactBuildTables merges topTables and botTables to form a list of new tables. func (s *levelsController) compactBuildTables( lev int, cd compactDef) ([]*table.Table, func() error, error) { topTables := cd.top botTables := cd.bot - var hasOverlap bool - { - kr := getKeyRange(cd.top...) - for i, lh := range s.levels { - if i <= lev { // Skip upper levels. - continue - } - lh.RLock() - left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) - lh.RUnlock() - if right-left > 0 { - hasOverlap = true - break - } - } - } + // Check overlap of the top level with the levels which are not being + // compacted in this compaction. We don't need to check overlap of the bottom + // tables with other levels because if the top tables overlap with any of the lower + // levels, it implies bottom level also overlaps because top and bottom tables + // overlap with each other. + hasOverlap := s.checkOverlap(cd.top, cd.nextLevel.level+1) // Try to collect stats so that we can inform value log about GC. That would help us find which // value log file should be GCed. @@ -470,9 +479,10 @@ func (s *levelsController) compactBuildTables( // Create iterators across all the tables involved first. var iters []y.Iterator - if lev == 0 { + switch { + case lev == 0: iters = appendIteratorsReversed(iters, topTables, false) - } else if len(topTables) > 0 { + case len(topTables) > 0: y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(false)} } @@ -561,22 +571,28 @@ func (s *levelsController) compactBuildTables( // versions which are below the minReadTs, otherwise, we might end up discarding the // only valid version for a running transaction. numVersions++ - lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 - if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) || - numVersions > s.kv.opt.NumVersionsToKeep || - lastValidVersion { + + // Keep the current version and discard all the next versions if + // - The `discardEarlierVersions` bit is set OR + // - We've already processed `NumVersionsToKeep` number of versions + // (including the current item being processed) + lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 || + numVersions == s.kv.opt.NumVersionsToKeep + + if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) || lastValidVersion { // If this version of the key is deleted or expired, skip all the rest of the // versions. Ensure that we're only removing versions below readTs. skipKey = y.SafeCopy(skipKey, it.Key()) - if lastValidVersion { + switch { + case lastValidVersion: // Add this key. We have set skipKey, so the following key versions // would be skipped. - } else if hasOverlap { + case hasOverlap: // If this key range has overlap with lower levels, then keep the deletion // marker with the latest version, discarding the rest. We have set skipKey, // so the following key versions would be skipped. - } else { + default: // If no overlap, we can skip all the versions, by continuing here. numSkips++ updateStats(vs) @@ -916,7 +932,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // Stall. Make sure all levels are healthy before we unstall. var timeStart time.Time { - s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(lastUnstalled)) + s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) s.cstatus.RLock() for i := 0; i < s.kv.opt.MaxLevels; i++ { s.elog.Printf("level=%d. Status=%s Size=%d\n", @@ -925,15 +941,13 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { s.cstatus.RUnlock() timeStart = time.Now() } - // Before we unstall, we need to make sure that level 0 and 1 are healthy. Otherwise, we - // will very quickly fill up level 0 again and if the compaction strategy favors level 0, - // then level 1 is going to super full. + // Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we + // will very quickly fill up level 0 again. for i := 0; ; i++ { - // Passing 0 for delSize to compactable means we're treating incomplete compactions as - // not having finished -- we wait for them to finish. Also, it's crucial this behavior - // replicates pickCompactLevels' behavior in computing compactability in order to - // guarantee progress. - if !s.isLevel0Compactable() && !s.levels[1].isCompactable(0) { + // It's crucial that this behavior replicates pickCompactLevels' behavior in + // computing compactability in order to guarantee progress. + // Break the loop once L0 has enough space to accommodate new tables. + if !s.isLevel0Compactable() { break } time.Sleep(10 * time.Millisecond) @@ -945,7 +959,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { } { s.elog.Printf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart)) - lastUnstalled = time.Now() + s.lastUnstalled = time.Now() } } diff --git a/levels_test.go b/levels_test.go new file mode 100644 index 000000000..688e3d64d --- /dev/null +++ b/levels_test.go @@ -0,0 +1,567 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package badger + +import ( + "math" + "testing" + "time" + + "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" + "github.com/dgraph-io/badger/v2/table" + "github.com/dgraph-io/badger/v2/y" + "github.com/stretchr/testify/require" +) + +// createAndOpen creates a table with the given data and adds it to the given level. +func createAndOpen(db *DB, td []keyValVersion, level int) { + opts := table.Options{ + BlockSize: db.opt.BlockSize, + BloomFalsePositive: db.opt.BloomFalsePositive, + LoadingMode: options.LoadToRAM, + ChkMode: options.NoVerification, + } + b := table.NewTableBuilder(opts) + + // Add all keys and versions to the table. + for _, item := range td { + key := y.KeyWithTs([]byte(item.key), uint64(item.version)) + val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} + b.Add(key, val, 0) + } + fd, err := y.CreateSyncedFile(table.NewFilename(db.lc.reserveFileID(), db.opt.Dir), true) + if err != nil { + panic(err) + } + + if _, err = fd.Write(b.Finish()); err != nil { + panic(err) + } + tab, err := table.OpenTable(fd, opts) + if err != nil { + panic(err) + } + if err := db.manifest.addChanges([]*pb.ManifestChange{ + newCreateChange(tab.ID(), level, 0, tab.CompressionType()), + }); err != nil { + panic(err) + } + // Add table to the given level. + db.lc.levels[level].tables = append(db.lc.levels[level].tables, tab) +} + +type keyValVersion struct { + key string + val string + version int + meta byte +} + +func TestCheckOverlap(t *testing.T) { + t.Run("overlap", func(t *testing.T) { + // This test consists of one table on level 0 and one on level 1. + // There is an overlap amongst the tables but there is no overlap + // with rest of the levels. + t.Run("same keys", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Level 0 should overlap with level 0 tables. + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 0)) + // Level 1 should overlap with level 0 tables (they have the same keys). + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) + // Level 2 and 3 should not overlap with level 0 tables. + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[1].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) + require.False(t, db.lc.checkOverlap(db.lc.levels[1].tables, 3)) + + }) + }) + t.Run("overlapping keys", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"a", "x", 1, 0}, {"b", "x", 1, 0}, {"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Level 0 should overlap with level 0 tables. + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 0)) + require.True(t, db.lc.checkOverlap(db.lc.levels[1].tables, 1)) + // Level 1 should overlap with level 0 tables, "foo" key is common. + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) + // Level 2 and 3 should not overlap with level 0 tables. + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) + }) + }) + }) + t.Run("non-overlapping", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"a", "x", 1, 0}, {"b", "x", 1, 0}, {"c", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Level 1 should not overlap with level 0 tables + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) + // Level 2 and 3 should not overlap with level 0 tables. + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) + }) + }) +} + +func getAllAndCheck(t *testing.T, db *DB, expected []keyValVersion) { + db.View(func(txn *Txn) error { + opt := DefaultIteratorOptions + opt.AllVersions = true + opt.InternalAccess = true + it := txn.NewIterator(opt) + defer it.Close() + i := 0 + for it.Rewind(); it.Valid(); it.Next() { + require.Less(t, i, len(expected), "DB has more number of key than expected") + item := it.Item() + v, err := item.ValueCopy(nil) + require.NoError(t, err) + // fmt.Printf("k: %s v: %d val: %s\n", item.key, item.Version(), v) + expect := expected[i] + require.Equal(t, expect.key, string(item.Key()), "expected key: %s actual key: %s", + expect.key, item.Key()) + require.Equal(t, expect.val, string(v), "key: %s expected value: %s actual %s", + item.key, expect.val, v) + require.Equal(t, expect.version, int(item.Version()), + "key: %s expected version: %d actual %d", item.key, expect.version, item.Version()) + require.Equal(t, expect.meta, item.meta, + "key: %s expected meta: %d meta %d", item.key, expect.meta, item.meta) + i++ + } + require.Equal(t, len(expected), i, "keys examined should be equal to keys expected") + return nil + }) + +} + +func TestCompaction(t *testing.T) { + // Disable compactions and keep single version of each key. + opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) + opt.managedTxns = true + t.Run("level 0 to level 1", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} + l01 := []keyValVersion{{"foo", "bar", 2, 0}} + l1 := []keyValVersion{{"foo", "bar", 1, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, + {"foo", "bar", 1, 0}, {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo version 2 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) + }) + }) + + t.Run("level 0 to level 1 with lower overlap", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} + l01 := []keyValVersion{{"foo", "bar", 2, 0}} + l1 := []keyValVersion{{"foo", "bar", 1, 0}} + l2 := []keyValVersion{{"foo", "bar", 0, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + // Level 2 has table l2. + createAndOpen(db, l2, 2) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"foo", "bar", 1, 0}, + {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo version 2 and version 1 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, + }) + }) + }) + + t.Run("level 1 to level 2", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l1 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} + l2 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l1, 1) + createAndOpen(db, l2, 2) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[1], + nextLevel: db.lc.levels[2], + top: db.lc.levels[1].tables, + bot: db.lc.levels[2].tables, + } + require.NoError(t, db.lc.runCompactDef(1, cdef)) + // foo version 2 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) + }) + }) +} + +func TestHeadKeyCleanup(t *testing.T) { + // Disable compactions and keep single version of each key. + opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) + opt.managedTxns = true + + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{ + {string(head), "foo", 5, 0}, {string(head), "bar", 4, 0}, {string(head), "baz", 3, 0}, + } + l1 := []keyValVersion{{string(head), "fooz", 2, 0}, {string(head), "foozbaz", 1, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {string(head), "foo", 5, 0}, {string(head), "bar", 4, 0}, {string(head), "baz", 3, 0}, + {string(head), "fooz", 2, 0}, {string(head), "foozbaz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo version 2 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{{string(head), "foo", 5, 0}}) + }) +} + +func TestDiscardTs(t *testing.T) { + // Disable compactions and keep single version of each key. + opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) + opt.managedTxns = true + + t.Run("all keys above discardTs", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}} + l01 := []keyValVersion{{"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set dicardTs to 1. All the keys are above discardTs. + db.SetDiscardTs(1) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // No keys should be dropped. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, + }) + }) + }) + t.Run("some keys above discardTs", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 2, 0}, + } + l1 := []keyValVersion{{"foo", "bbb", 1, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Set dicardTs to 3. foo2 and foo1 should be dropped. + db.SetDiscardTs(3) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, + {"foo", "bbb", 1, 0}, {"fooz", "baz", 2, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo1 and foo2 should be dropped. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"fooz", "baz", 2, 0}, + }) + }) + }) + t.Run("all keys below discardTs", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}} + l01 := []keyValVersion{{"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set dicardTs to 10. All the keys are below discardTs. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // Only one version of every key should be left. + getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}}) + }) + }) +} + +// This is a test to ensure that the first entry with DiscardEarlierversion bit < DiscardTs +// is kept around (when numversionstokeep is infinite). +func TestDiscardFirstVersion(t *testing.T) { + opt := DefaultOptions("") + opt.NumCompactors = 0 + opt.NumVersionsToKeep = math.MaxInt32 + opt.managedTxns = true + + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 1, 0}} + l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}} + l02 := []keyValVersion{{"foo", "bar", 3, 0}} + l03 := []keyValVersion{{"foo", "bar", 4, 0}} + l04 := []keyValVersion{{"foo", "bar", 9, 0}} + l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}} + + // Level 0 has all the tables. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + createAndOpen(db, l02, 0) + createAndOpen(db, l03, 0) + createAndOpen(db, l04, 0) + createAndOpen(db, l05, 0) + + // Discard Time stamp is set to 7. + db.SetDiscardTs(7) + + // Compact L0 to L1 + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + + // - Version 10, 9 lie above version 7 so they should be there. + // - Version 4, 3, 2 lie below the discardTs but they don't have the + // "bitDiscardEarlierVersions" versions set so they should not be removed because number + // of versions to keep is set to infinite. + // - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions" + // marker so IT WILL BE REMOVED. + ExpectedKeys := []keyValVersion{ + {"foo", "bar", 10, bitDiscardEarlierVersions}, + {"foo", "bar", 9, 0}, + {"foo", "bar", 4, 0}, + {"foo", "bar", 3, 0}, + {"foo", "bar", 2, bitDiscardEarlierVersions}} + + getAllAndCheck(t, db, ExpectedKeys) + }) +} + +// This test ensures we don't stall when L1's size is greater than opt.LevelOneSize. +// We should stall only when L0 tables more than the opt.NumLevelZeroTableStall. +func TestL1Stall(t *testing.T) { + opt := DefaultOptions("") + // Disable all compactions. + opt.NumCompactors = 0 + // Number of level zero tables. + opt.NumLevelZeroTables = 3 + // Addition of new tables will stall if there are 4 or more L0 tables. + opt.NumLevelZeroTablesStall = 4 + // Level 1 size is 10 bytes. + opt.LevelOneSize = 10 + + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + // Level 0 has 4 tables. + db.lc.levels[0].Lock() + db.lc.levels[0].tables = []*table.Table{createEmptyTable(db), createEmptyTable(db), + createEmptyTable(db), createEmptyTable(db)} + db.lc.levels[0].Unlock() + + timeout := time.After(5 * time.Second) + done := make(chan bool) + + // This is important. Set level 1 size more than the opt.LevelOneSize (we've set it to 10). + db.lc.levels[1].totalSize = 100 + go func() { + tab := createEmptyTable(db) + require.NoError(t, db.lc.addLevel0Table(tab)) + tab.DecrRef() + done <- true + }() + time.Sleep(time.Second) + + db.lc.levels[0].Lock() + // Drop two tables from Level 0 so that addLevel0Table can make progress. Earlier table + // count was 4 which is equal to L0 stall count. + toDrop := db.lc.levels[0].tables[:2] + decrRefs(toDrop) + db.lc.levels[0].tables = db.lc.levels[0].tables[2:] + db.lc.levels[0].Unlock() + + select { + case <-timeout: + t.Fatal("Test didn't finish in time") + case <-done: + } + }) +} + +func createEmptyTable(db *DB) *table.Table { + opts := table.Options{ + BloomFalsePositive: db.opt.BloomFalsePositive, + LoadingMode: options.LoadToRAM, + ChkMode: options.NoVerification, + } + b := table.NewTableBuilder(opts) + // Add one key so that we can open this table. + b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0) + + // Open table in memory to avoid adding changes to manifest file. + tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts) + if err != nil { + panic(err) + } + + return tab +} + +func TestL0Stall(t *testing.T) { + test := func(t *testing.T, opt *Options) { + runBadgerTest(t, opt, func(t *testing.T, db *DB) { + db.lc.levels[0].Lock() + // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level + // zero and all new additions are expected to stall if L0 is in memory. + for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { + db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) + } + db.lc.levels[0].Unlock() + + timeout := time.After(5 * time.Second) + done := make(chan bool) + + go func() { + tab := createEmptyTable(db) + require.NoError(t, db.lc.addLevel0Table(tab)) + tab.DecrRef() + done <- true + }() + // Let it stall for a second. + time.Sleep(time.Second) + + select { + case <-timeout: + if opt.KeepL0InMemory { + t.Log("Timeout triggered") + // Mark this test as successful since L0 is in memory and the + // addition of new table to L0 is supposed to stall. + } else { + t.Fatal("Test didn't finish in time") + } + case <-done: + // The test completed before 5 second timeout. Mark it as successful. + } + }) + } + + opt := DefaultOptions("") + opt.EventLogging = false + // Disable all compactions. + opt.NumCompactors = 0 + // Number of level zero tables. + opt.NumLevelZeroTables = 3 + // Addition of new tables will stall if there are 4 or more L0 tables. + opt.NumLevelZeroTablesStall = 4 + + t.Run("with KeepL0InMemory", func(t *testing.T) { + opt.KeepL0InMemory = true + test(t, &opt) + }) + t.Run("with L0 on disk", func(t *testing.T) { + opt.KeepL0InMemory = false + test(t, &opt) + }) +} diff --git a/manifest_test.go b/manifest_test.go index c9a51fc60..5062b3f1b 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -168,6 +168,7 @@ func TestOverlappingKeyRangeError(t *testing.T) { defer removeDir(dir) kv, err := Open(DefaultOptions(dir)) require.NoError(t, err) + defer kv.Close() lh0 := newLevelHandler(kv, 0) lh1 := newLevelHandler(kv, 1) diff --git a/options.go b/options.go index 4374fc39d..4fbe09199 100644 --- a/options.go +++ b/options.go @@ -21,7 +21,6 @@ import ( "github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/table" - "github.com/dgraph-io/badger/v2/y" ) // Note: If you add a new option X make sure you also add a WithX method on Options. @@ -102,11 +101,6 @@ type Options struct { // DefaultOptions sets a list of recommended options for good performance. // Feel free to modify these to suit your needs with the WithX methods. func DefaultOptions(path string) Options { - defaultCompression := options.ZSTD - // Use snappy as default compression algorithm if badger is built without CGO. - if !y.CgoEnabled { - defaultCompression = options.Snappy - } return Options{ Dir: path, ValueDir: path, @@ -129,16 +123,19 @@ func DefaultOptions(path string) Options { CompactL0OnClose: true, KeepL0InMemory: true, VerifyValueChecksum: false, - Compression: defaultCompression, + Compression: options.None, MaxCacheSize: 1 << 30, // 1 GB - // Benchmarking compression level against performance showed that level 15 gives - // the best speed vs ratio tradeoff. - // For a data size of 4KB we get - // Level: 3 Ratio: 2.72 Time: 24112 n/s - // Level: 10 Ratio: 2.95 Time: 75655 n/s - // Level: 15 Ratio: 4.38 Time: 239042 n/s - // See https://github.com/dgraph-io/badger/pull/1111#issue-338120757 - ZSTDCompressionLevel: 15, + // The following benchmarks were done on a 4 KB block size (default block size). The + // compression is ratio supposed to increase with increasing compression level but since the + // input for compression algorithm is small (4 KB), we don't get significant benefit at + // level 3. + // no_compression-16 10 502848865 ns/op 165.46 MB/s - + // zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93 + // zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72 + // zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38 + // Benchmark code can be found in table/builder_test.go file + ZSTDCompressionLevel: 1, + // Nothing to read/write value log using standard File I/O // MemoryMap to mmap() the value log files // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32. @@ -539,7 +536,8 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat // WithMaxCacheSize returns a new Options value with MaxCacheSize set to the given value. // // This value specifies how much data cache should hold in memory. A small size of cache means lower -// memory consumption and lookups/iterations would take longer. +// memory consumption and lookups/iterations would take longer. Setting size to zero disables the +// cache altogether. func (opt Options) WithMaxCacheSize(size int64) Options { opt.MaxCacheSize = size return opt @@ -560,7 +558,18 @@ func (opt Options) WithInMemory(b bool) Options { // The ZSTD compression algorithm supports 20 compression levels. The higher the compression // level, the better is the compression ratio but lower is the performance. Lower levels // have better performance and higher levels have better compression ratios. -// The default value of ZSTDCompressionLevel is 15. +// We recommend using level 1 ZSTD Compression Level. Any level higher than 1 seems to +// deteriorate badger's performance. +// The following benchmarks were done on a 4 KB block size (default block size). The compression is +// ratio supposed to increase with increasing compression level but since the input for compression +// algorithm is small (4 KB), we don't get significant benefit at level 3. It is advised to write +// your own benchmarks before choosing a compression algorithm or level. +// +// no_compression-16 10 502848865 ns/op 165.46 MB/s - +// zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93 +// zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72 +// zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38 +// Benchmark code can be found in table/builder_test.go file func (opt Options) WithZSTDCompressionLevel(cLevel int) Options { opt.ZSTDCompressionLevel = cLevel return opt diff --git a/skl/skl.go b/skl/skl.go index cdfc599be..43694f14b 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -34,11 +34,11 @@ package skl import ( "math" - "math/rand" "sync/atomic" "unsafe" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -165,9 +165,9 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { // return n != nil && y.CompareKeys(key, n.key) > 0 //} -func randomHeight() int { +func (s *Skiplist) randomHeight() int { h := 1 - for h < maxHeight && rand.Uint32() <= heightIncrease { + for h < maxHeight && z.FastRand() <= heightIncrease { h++ } return h @@ -300,7 +300,7 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) { } // We do need to create a new node. - height := randomHeight() + height := s.randomHeight() x := newNode(s.arena, key, v, height) // Try to increase s.height via CAS. diff --git a/skl/skl_test.go b/skl/skl_test.go index 6bd075862..0be7a64e4 100644 --- a/skl/skl_test.go +++ b/skl/skl_test.go @@ -499,7 +499,7 @@ func BenchmarkReadWriteMap(b *testing.B) { b.RunParallel(func(pb *testing.PB) { rng := rand.New(rand.NewSource(time.Now().UnixNano())) for pb.Next() { - if rand.Float32() < readFrac { + if rng.Float32() < readFrac { mutex.RLock() _, ok := m[string(randomKey(rng))] mutex.RUnlock() @@ -516,3 +516,16 @@ func BenchmarkReadWriteMap(b *testing.B) { }) } } + +func BenchmarkWrite(b *testing.B) { + value := newValue(123) + l := NewSkiplist(int64((b.N + 1) * MaxNodeSize)) + defer l.DecrRef() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + l.Put(randomKey(rng), y.ValueStruct{Value: value, Meta: 0, UserMeta: 0}) + } + }) +} diff --git a/table/builder_test.go b/table/builder_test.go index 3af2ce358..76296562e 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -57,7 +57,7 @@ func TestTableIndex(t *testing.T) { keysCount := 10000 for _, opt := range opts { builder := NewTableBuilder(opt) - filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Int63()) + filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) require.NoError(t, err) @@ -80,11 +80,11 @@ func TestTableIndex(t *testing.T) { require.NoError(t, err, "unable to write to file") tbl, err := OpenTable(f, opt) + require.NoError(t, err, "unable to open table") if opt.DataKey == nil { // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } - require.NoError(t, err, "unable to open table") // Ensure index is built correctly require.Equal(t, blockCount, len(tbl.blockIndex)) @@ -124,14 +124,42 @@ func BenchmarkBuilder(b *testing.B) { vs := y.ValueStruct{Value: []byte(val)} keysCount := 1300000 // This number of entries consumes ~64MB of memory. - for i := 0; i < b.N; i++ { - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} - builder := NewTableBuilder(opts) - for i := 0; i < keysCount; i++ { - builder.Add(key(i), vs, 0) - } + bench := func(b *testing.B, opt *Options) { + // KeyCount * (keySize + ValSize) + b.SetBytes(int64(keysCount) * (32 + 32)) + for i := 0; i < b.N; i++ { + opt.BlockSize = 4 * 1024 + opt.BloomFalsePositive = 0.01 + builder := NewTableBuilder(*opt) + + for i := 0; i < keysCount; i++ { + builder.Add(key(i), vs, 0) + } - _ = builder.Finish() + _ = builder.Finish() + } } + + b.Run("no compression", func(b *testing.B) { + var opt Options + opt.Compression = options.None + bench(b, &opt) + }) + b.Run("zstd compression", func(b *testing.B) { + var opt Options + opt.Compression = options.ZSTD + b.Run("level 1", func(b *testing.B) { + opt.ZSTDCompressionLevel = 1 + bench(b, &opt) + }) + b.Run("level 3", func(b *testing.B) { + opt.ZSTDCompressionLevel = 3 + bench(b, &opt) + }) + b.Run("level 15", func(b *testing.B) { + opt.ZSTDCompressionLevel = 15 + bench(b, &opt) + }) + }) } diff --git a/table/merge_iterator.go b/table/merge_iterator.go index e93edbbb9..e1809e027 100644 --- a/table/merge_iterator.go +++ b/table/merge_iterator.go @@ -55,17 +55,18 @@ func (n *node) setIterator(iter y.Iterator) { } func (n *node) setKey() { - if n.merge != nil { + switch { + case n.merge != nil: n.valid = n.merge.small.valid if n.valid { n.key = n.merge.small.key } - } else if n.concat != nil { + case n.concat != nil: n.valid = n.concat.Valid() if n.valid { n.key = n.concat.Key() } - } else { + default: n.valid = n.iter.Valid() if n.valid { n.key = n.iter.Key() @@ -74,11 +75,12 @@ func (n *node) setKey() { } func (n *node) next() { - if n.merge != nil { + switch { + case n.merge != nil: n.merge.Next() - } else if n.concat != nil { + case n.concat != nil: n.concat.Next() - } else { + default: n.iter.Next() } n.setKey() @@ -103,22 +105,22 @@ func (mi *MergeIterator) fix() { return } cmp := y.CompareKeys(mi.small.key, mi.bigger().key) - // Both the keys are equal. - if cmp == 0 { + switch { + case cmp == 0: // Both the keys are equal. // In case of same keys, move the right iterator ahead. mi.right.next() if &mi.right == mi.small { mi.swapSmall() } return - } else if cmp < 0 { // Small is less than bigger(). + case cmp < 0: // Small is less than bigger(). if mi.reverse { mi.swapSmall() } else { // we don't need to do anything. Small already points to the smallest. } return - } else { // bigger() is less than small. + default: // bigger() is less than small. if mi.reverse { // Do nothing since we're iterating in reverse. Small currently points to // the bigger key and that's okay in reverse iteration. @@ -206,11 +208,12 @@ func (mi *MergeIterator) Close() error { // NewMergeIterator creates a merge iterator. func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator { - if len(iters) == 0 { + switch len(iters) { + case 0: return nil - } else if len(iters) == 1 { + case 1: return iters[0] - } else if len(iters) == 2 { + case 2: mi := &MergeIterator{ reverse: reverse, } diff --git a/table/table.go b/table/table.go index d68169384..25227be34 100644 --- a/table/table.go +++ b/table/table.go @@ -18,6 +18,7 @@ package table import ( "crypto/aes" + "encoding/binary" "fmt" "io" "math" @@ -81,7 +82,7 @@ type TableInterface interface { DoesNotHave(hash uint64) bool } -// Table represents a loaded table file with the info we have about it +// Table represents a loaded table file with the info we have about it. type Table struct { sync.Mutex @@ -97,10 +98,11 @@ type Table struct { smallest, biggest []byte // Smallest and largest keys (with timestamps). id uint64 // file id, part of filename - bf *z.Bloom Checksum []byte // Stores the total size of key-values stored in this table (including the size on vlog). estimatedSize uint64 + indexStart int + indexLen int IsInmemory bool // Set to true if the table is on level 0 and opened in memory. opt *Options @@ -146,6 +148,13 @@ func (t *Table) DecrRef() error { if err := os.Remove(filename); err != nil { return err } + // Delete all blocks from the cache. + for i := range t.blockIndex { + t.opt.Cache.Del(t.blockCacheKey(i)) + } + // Delete bloom filter from the cache. + t.opt.Cache.Del(t.bfCacheKey()) + } return nil } @@ -232,6 +241,7 @@ func OpenTable(fd *os.File, opts Options) (*Table, error) { if err := t.initBiggestAndSmallest(); err != nil { return nil, errors.Wrapf(err, "failed to initialize table") } + if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead { if err := t.VerifyChecksum(); err != nil { _ = fd.Close() @@ -320,6 +330,9 @@ func (t *Table) readIndex() error { readPos -= 4 buf := t.readNoFail(readPos, 4) checksumLen := int(y.BytesToU32(buf)) + if checksumLen < 0 { + return errors.New("checksum length less than zero. Data corrupted") + } // Read checksum. expectedChk := &pb.Checksum{} @@ -332,10 +345,12 @@ func (t *Table) readIndex() error { // Read index size from the footer. readPos -= 4 buf = t.readNoFail(readPos, 4) - indexLen := int(y.BytesToU32(buf)) + t.indexLen = int(y.BytesToU32(buf)) + // Read index. - readPos -= indexLen - data := t.readNoFail(readPos, indexLen) + readPos -= t.indexLen + t.indexStart = readPos + data := t.readNoFail(readPos, t.indexLen) if err := y.VerifyChecksum(data, expectedChk); err != nil { return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) @@ -354,8 +369,18 @@ func (t *Table) readIndex() error { y.Check(err) t.estimatedSize = index.EstimatedSize - t.bf = z.JSONUnmarshal(index.BloomFilter) t.blockIndex = index.Offsets + + // Avoid the cost of unmarshalling the bloom filters if the cache is absent. + if t.opt.Cache != nil { + var bf *z.Bloom + if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil { + return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex", + t.id) + } + + t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter))) + } return nil } @@ -436,10 +461,25 @@ func (t *Table) block(idx int) (*block, error) { return blk, nil } -func (t *Table) blockCacheKey(idx int) uint64 { - y.AssertTrue(t.ID() < math.MaxUint32) +// bfCacheKey returns the cache key for bloom filter. +func (t *Table) bfCacheKey() []byte { + y.AssertTrue(t.id < math.MaxUint32) + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, uint32(t.id)) + + // Without the "bf" prefix, we will have conflict with the blockCacheKey. + return append([]byte("bf"), buf...) +} + +func (t *Table) blockCacheKey(idx int) []byte { + y.AssertTrue(t.id < math.MaxUint32) y.AssertTrue(uint32(idx) < math.MaxUint32) - return (t.ID() << 32) | uint64(idx) + + buf := make([]byte, 8) + // Assume t.ID does not overflow uint32. + binary.BigEndian.PutUint32(buf[:4], uint32(t.ID())) + binary.BigEndian.PutUint32(buf[4:], uint32(idx)) + return buf } // EstimatedSize returns the total size of key-values stored in this table (including the @@ -463,7 +503,44 @@ func (t *Table) ID() uint64 { return t.id } // DoesNotHave returns true if (but not "only if") the table does not have the key hash. // It does a bloom filter lookup. -func (t *Table) DoesNotHave(hash uint64) bool { return !t.bf.Has(hash) } +func (t *Table) DoesNotHave(hash uint64) bool { + var bf *z.Bloom + + // Return fast if cache is absent. + if t.opt.Cache == nil { + bf, _ := t.readBloomFilter() + return !bf.Has(hash) + } + + // Check if the bloomfilter exists in the cache. + if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok { + bf = b.(*z.Bloom) + return !bf.Has(hash) + } + + bf, sz := t.readBloomFilter() + t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz)) + return !bf.Has(hash) +} + +// readBloomFilter reads the bloom filter from the SST and returns its length +// along with the bloom filter. +func (t *Table) readBloomFilter() (*z.Bloom, int) { + // Read bloom filter from the SST. + data := t.readNoFail(t.indexStart, t.indexLen) + index := pb.TableIndex{} + var err error + // Decrypt the table index if it is encrypted. + if t.shouldDecrypt() { + data, err = t.decrypt(data) + y.Check(err) + } + y.Check(proto.Unmarshal(data, &index)) + + bf, err := z.JSONUnmarshal(index.BloomFilter) + y.Check(err) + return bf, len(index.BloomFilter) +} // VerifyChecksum verifies checksum for all blocks of table. This function is called by // OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). diff --git a/table/table_test.go b/table/table_test.go index 82bddf591..27a4f1d16 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -77,13 +77,9 @@ func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. - filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) + filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Uint32()) f, err := y.CreateSyncedFile(filename, true) - if t != nil { - require.NoError(t, err) - } else { - y.Check(err) - } + require.NoError(t, err) sort.Slice(keyValues, func(i, j int) bool { return keyValues[i][0] < keyValues[j][0] @@ -743,7 +739,10 @@ func TestTableChecksum(t *testing.T) { f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") - f.WriteAt(rb, rand.Int63n(fi.Size())) + // Write random bytes at random location. + n, err := f.WriteAt(rb, rand.Int63n(fi.Size())) + require.NoError(t, err) + require.Equal(t, n, len(rb)) _, err = OpenTable(f, opts) if err == nil || !strings.Contains(err.Error(), "checksum") { diff --git a/test.sh b/test.sh index 90d21889c..b4e40601a 100755 --- a/test.sh +++ b/test.sh @@ -4,30 +4,43 @@ set -e go version +packages=$(go list ./... | grep github.com/dgraph-io/badger/v2/) + +if [[ ! -z "$TEAMCITY_VERSION" ]]; then + export GOFLAGS="-json" +fi + # Ensure that we can compile the binary. pushd badger go build -v . popd # Run the memory intensive tests first. -go test -v --manual=true -run='TestBigKeyValuePairs$' -go test -v --manual=true -run='TestPushValueLogLimit' +go test -v -run='TestBigKeyValuePairs$' --manual=true +go test -v -run='TestPushValueLogLimit' --manual=true # Run the special Truncate test. rm -rf p -go test -v --manual=true -run='TestTruncateVlogNoClose$' . +go test -v -run='TestTruncateVlogNoClose$' --manual=true truncate --size=4096 p/000000.vlog -go test -v --manual=true -run='TestTruncateVlogNoClose2$' . -go test -v --manual=true -run='TestTruncateVlogNoClose3$' . +go test -v -run='TestTruncateVlogNoClose2$' --manual=true +go test -v -run='TestTruncateVlogNoClose3$' --manual=true rm -rf p # Then the normal tests. +echo +echo "==> Starting test for table, skl and y package" +go test -v -race github.com/dgraph-io/badger/v2/skl +# Run test for all package except the top level package. The top level package support the +# `vlog_mmap` flag which rest of the packages don't support. +go test -v -race $packages + echo echo "==> Starting tests with value log mmapped..." -sleep 5 -go test -v --vlog_mmap=true -race ./... +# Run top level package tests with mmap flag. +go test -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=true echo echo "==> Starting tests with value log not mmapped..." -sleep 5 -go test -v --vlog_mmap=false -race ./... +go test -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=false + diff --git a/txn_test.go b/txn_test.go index 8450da910..647bd9806 100644 --- a/txn_test.go +++ b/txn_test.go @@ -837,9 +837,8 @@ func TestManagedDB(t *testing.T) { func TestArmV7Issue311Fix(t *testing.T) { dir, err := ioutil.TempDir("", "") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(dir) db, err := Open(DefaultOptions(dir). @@ -848,31 +847,21 @@ func TestArmV7Issue311Fix(t *testing.T) { WithLevelOneSize(8 << 20). WithMaxTableSize(2 << 20). WithSyncWrites(false)) - if err != nil { - t.Fatalf("cannot open db at location %s: %v", dir, err) - } + + require.NoError(t, err) err = db.View(func(txn *Txn) error { return nil }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte{0x11}, []byte{0x22})) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte{0x11}, []byte{0x22})) }) - if err != nil { - t.Fatal(err) - } - - if err = db.Close(); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + require.NoError(t, db.Close()) } diff --git a/value.go b/value.go index cdf575a29..b0d94ad86 100644 --- a/value.go +++ b/value.go @@ -197,7 +197,7 @@ func (lf *logFile) encryptionEnabled() bool { } func (lf *logFile) munmap() (err error) { - if lf.loadingMode != options.MemoryMap { + if lf.loadingMode != options.MemoryMap || len(lf.fmap) == 0 { // Nothing to do return nil } @@ -436,15 +436,18 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, var lastCommit uint64 var validEndOffset uint32 = offset + +loop: for { e, err := read.Entry(reader) - if err == io.EOF { - break - } else if err == io.ErrUnexpectedEOF || err == errTruncate { - break - } else if err != nil { + switch { + case err == io.EOF: + break loop + case err == io.ErrUnexpectedEOF || err == errTruncate: + break loop + case err != nil: return 0, err - } else if e == nil { + case e == nil: continue } @@ -455,29 +458,30 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, vp.Offset = e.offset vp.Fid = lf.fid - if e.meta&bitTxn > 0 { + switch { + case e.meta&bitTxn > 0: txnTs := y.ParseTs(e.Key) if lastCommit == 0 { lastCommit = txnTs } if lastCommit != txnTs { - break + break loop } - } else if e.meta&bitFinTxn > 0 { + case e.meta&bitFinTxn > 0: txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil || lastCommit != txnTs { - break + break loop } // Got the end of txn. Now we can store them. lastCommit = 0 validEndOffset = read.recordOffset - } else { + default: if lastCommit != 0 { // This is most likely an entry which was moved as part of GC. // We shouldn't get this entry in the middle of a transaction. - break + break loop } validEndOffset = read.recordOffset } @@ -493,7 +497,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, } func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { - maxFid := atomic.LoadUint32(&vlog.maxFid) + vlog.filesLock.RLock() + maxFid := vlog.maxFid + vlog.filesLock.RUnlock() y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid) tr.LazyPrintf("Rewriting fid: %d", f.fid) @@ -523,12 +529,19 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { var vp valuePointer vp.Decode(vs.Value) + // If the entry found from the LSM Tree points to a newer vlog file, don't do anything. if vp.Fid > f.fid { return nil } + // If the entry found from the LSM Tree points to an offset greater than the one + // read from vlog, don't do anything. if vp.Offset > e.offset { return nil } + // If the entry read from LSM Tree and vlog file point to the same vlog file and offset, + // insert them back into the DB. + // NOTE: It might be possible that the entry read from the LSM Tree points to + // an older vlog file. See the comments in the else part. if vp.Fid == f.fid && vp.Offset == e.offset { moved++ // This new entry only contains the key, and a pointer to the value. @@ -564,7 +577,46 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { wb = append(wb, ne) size += es } else { - vlog.db.opt.Warningf("This entry should have been caught. %+v\n", e) + // It might be possible that the entry read from LSM Tree points to an older vlog file. + // This can happen in the following situation. Assume DB is opened with + // numberOfVersionsToKeep=1 + // + // Now, if we have ONLY one key in the system "FOO" which has been updated 3 times and + // the same key has been garbage collected 3 times, we'll have 3 versions of the movekey + // for the same key "FOO". + // NOTE: moveKeyi is the moveKey with version i + // Assume we have 3 move keys in L0. + // - moveKey1 (points to vlog file 10), + // - moveKey2 (points to vlog file 14) and + // - moveKey3 (points to vlog file 15). + + // Also, assume there is another move key "moveKey1" (points to vlog file 6) (this is + // also a move Key for key "FOO" ) on upper levels (let's say 3). The move key + // "moveKey1" on level 0 was inserted because vlog file 6 was GCed. + // + // Here's what the arrangement looks like + // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15) + // L1 => .... + // L2 => .... + // L3 => (moveKey1 => vlog6) + // + // When L0 compaction runs, it keeps only moveKey3 because the number of versions + // to keep is set to 1. (we've dropped moveKey1's latest version) + // + // The new arrangement of keys is + // L0 => .... + // L1 => (moveKey3 => vlog15) + // L2 => .... + // L3 => (moveKey1 => vlog6) + // + // Now if we try to GC vlog file 10, the entry read from vlog file will point to vlog10 + // but the entry read from LSM Tree will point to vlog6. The move key read from LSM tree + // will point to vlog6 because we've asked for version 1 of the move key. + // + // This might seem like an issue but it's not really an issue because the user has set + // the number of versions to keep to 1 and the latest version of moveKey points to the + // correct vlog file and offset. The stale move key on L3 will be eventually dropped by + // compaction because there is a newer versions in the upper levels. } return nil } @@ -762,10 +814,9 @@ func (vlog *valueLog) dropAll() (int, error) { } vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0") - if _, err := vlog.createVlogFile(0); err != nil { + if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped. return count, err } - atomic.StoreUint32(&vlog.maxFid, 0) return count, nil } @@ -786,12 +837,12 @@ type valueLog struct { // guards our view of which files exist, which to be deleted, how many active iterators filesLock sync.RWMutex filesMap map[uint32]*logFile + maxFid uint32 filesToBeDeleted []uint32 // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted. numActiveIterators int32 db *DB - maxFid uint32 // accessed via atomics. writableLogOffset uint32 // read by read, written by write. Must access via atomics. numEntriesWritten uint32 opt Options @@ -856,7 +907,11 @@ func (lf *logFile) open(path string, flags uint32) error { return errFile(err, lf.path, "Unable to run file.Stat") } sz := fi.Size() - y.AssertTruef(sz <= math.MaxUint32, "file size: %d greater than %d", sz, math.MaxUint32) + y.AssertTruef( + sz <= math.MaxUint32, + "file size: %d greater than %d", + uint32(sz), uint32(math.MaxUint32), + ) lf.size = uint32(sz) if sz < vlogHeaderSize { // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize @@ -947,14 +1002,15 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) { if err = lf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil { return nil, errFile(err, lf.path, "Mmap value log file") } + + vlog.filesLock.Lock() + vlog.filesMap[fid] = lf + vlog.maxFid = fid // writableLogOffset is only written by write func, by read by Read func. // To avoid a race condition, all reads and updates to this variable must be // done via atomics. atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize) vlog.numEntriesWritten = 0 - - vlog.filesLock.Lock() - vlog.filesMap[fid] = lf vlog.filesLock.Unlock() return lf, nil @@ -1105,12 +1161,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { // plain text mode or vice versa. A single vlog file can't have both // encrypted entries and plain text entries. if last.encryptionEnabled() != vlog.db.shouldEncrypt() { - newid := atomic.AddUint32(&vlog.maxFid, 1) + newid := vlog.maxFid + 1 _, err := vlog.createVlogFile(newid) if err != nil { return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid) } - last, ok = vlog.filesMap[vlog.maxFid] + last, ok = vlog.filesMap[newid] y.AssertTrue(ok) } lastOffset, err := last.fd.Seek(0, io.SeekEnd) @@ -1172,7 +1228,7 @@ func (vlog *valueLog) Close() error { err = munmapErr } - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid if !vlog.opt.ReadOnly && id == maxFid { // truncate writable log file to correct offset. if truncErr := f.fd.Truncate( @@ -1265,12 +1321,12 @@ func (reqs requests) IncrRef() { // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32. func (vlog *valueLog) sync(fid uint32) error { - if vlog.opt.SyncWrites { + if vlog.opt.SyncWrites || vlog.opt.InMemory { return nil } vlog.filesLock.RLock() - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid // During replay it is possible to get sync call with fid less than maxFid. // Because older file has already been synced, we can return from here. if fid < maxFid || len(vlog.filesMap) == 0 { @@ -1303,7 +1359,7 @@ func (vlog *valueLog) write(reqs []*request) error { return nil } vlog.filesLock.RLock() - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid curlf := vlog.filesMap[maxFid] vlog.filesLock.RUnlock() @@ -1335,7 +1391,7 @@ func (vlog *valueLog) write(reqs []*request) error { return err } - newid := atomic.AddUint32(&vlog.maxFid, 1) + newid := vlog.maxFid + 1 y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid) newlf, err := vlog.createVlogFile(newid) if err != nil { @@ -1396,14 +1452,26 @@ func (vlog *valueLog) write(reqs []*request) error { // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file // (if non-nil) -func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { +func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) { vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() - ret, ok := vlog.filesMap[fid] + ret, ok := vlog.filesMap[vp.Fid] if !ok { // log file has gone away, will need to retry the operation. return nil, ErrRetry } + + // Check for valid offset if we are reading from writable log. + maxFid := vlog.maxFid + if vp.Fid == maxFid { + currentOffset := vlog.woffset() + if vp.Offset >= currentOffset { + return nil, errors.Errorf( + "Invalid value pointer offset: %d greater than current offset: %d", + vp.Offset, currentOffset) + } + } + ret.lock.RLock() return ret, nil } @@ -1411,13 +1479,6 @@ func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { // Read reads the value log at a given location. // TODO: Make this read private. func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) { - // Check for valid offset if we are reading from writable log. - maxFid := atomic.LoadUint32(&vlog.maxFid) - if vp.Fid == maxFid && vp.Offset >= vlog.woffset() { - return nil, nil, errors.Errorf( - "Invalid value pointer offset: %d greater than current offset: %d", - vp.Offset, vlog.woffset()) - } buf, lf, err := vlog.readValueBytes(vp, s) // log file is locked so, decide whether to lock immediately or let the caller to // unlock it, after caller uses it. @@ -1467,10 +1528,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() { // readValueBytes return vlog entry slice and read locked log file. Caller should take care of // logFile unlocking. func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) { - lf, err := vlog.getFileRLocked(vp.Fid) + lf, err := vlog.getFileRLocked(vp) if err != nil { return nil, nil, err } + buf, err := lf.read(vp, s) return buf, lf, err } @@ -1479,10 +1541,11 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() fids := vlog.sortedFids() - if len(fids) <= 1 { + switch { + case len(fids) <= 1: tr.LazyPrintf("Only one or less value log file.") return nil - } else if head.Fid == 0 { + case head.Fid == 0: tr.LazyPrintf("Head pointer is at zero.") return nil } diff --git a/value_test.go b/value_test.go index 304ac7d41..8aa7c0909 100644 --- a/value_test.go +++ b/value_test.go @@ -780,6 +780,9 @@ func TestPenultimateLogCorruption(t *testing.T) { if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) } + require.NoError(t, db0.vlog.Close()) + require.NoError(t, db0.manifest.close()) + require.NoError(t, db0.registry.Close()) opt.Truncate = true db1, err := Open(opt) @@ -799,7 +802,9 @@ func TestPenultimateLogCorruption(t *testing.T) { func checkKeys(t *testing.T, kv *DB, keys [][]byte) { i := 0 txn := kv.NewTransaction(false) + defer txn.Discard() iter := txn.NewIterator(IteratorOptions{}) + defer iter.Close() for iter.Seek(keys[0]); iter.Valid(); iter.Next() { require.Equal(t, iter.Item().Key(), keys[i]) i++ diff --git a/y/y_test.go b/y/y_test.go index 168da889b..d1b963184 100644 --- a/y/y_test.go +++ b/y/y_test.go @@ -176,7 +176,7 @@ func TestPagebufferReader2(t *testing.T) { require.Equal(t, n, 10, "length of buffer and length written should be equal") require.NoError(t, err, "unable to write bytes to buffer") - randOffset := int(rand.Int31n(int32(b.length))) + randOffset := int(rand.Int31n(int32(b.length) - 1)) randLength := int(rand.Int31n(int32(b.length - randOffset))) reader := b.NewReaderAt(randOffset) // Read randLength bytes.