Skip to content

fix(test): fix flakiness of TestPersistLFDiscardStats #1963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (lk *lockedKeys) all() []uint64 {
// DB provides the various functions required to interact with Badger.
// DB is thread-safe.
type DB struct {
testOnlyDBExtensions

lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.

dirLockGuard *directoryLockGuard
Expand Down Expand Up @@ -252,6 +254,9 @@ func Open(opt Options) (*DB, error) {
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold: initVlogThreshold(&opt),
}

db.setSyncChan(opt.syncChan)

// Cleanup all the goroutines started by badger in case of an error.
defer func() {
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
//
// Each option X is documented on the WithX method.
type Options struct {
testOnlyOptions

// Required options.

Dir string
Expand Down
97 changes: 97 additions & 0 deletions test_only.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could rename this file to test_extensions.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

// Important: Do NOT import the "testing" package, as otherwise, that
// will pull in imports into the production class that we do not want.

// TODO: Consider using this with specific compilation tags so that it only
// shows up when performing testing (e.g., specify build tag=unit).
// We are not yet ready to do that, as it may impact customer usage as
// well as requiring us to update the CI build flags. Moreover, the
// current model does not actually incur any significant cost.
// If we do this, we will also want to introduce a parallel file that
// overrides some of these structs and functions with empty contents.

// testOnlyOptions specifies an extension to the type Options that we want to
// use only in the context of testing.
type testOnlyOptions struct {
// syncChan is used to listen for specific messages related to activities
// that can occur in a DB instance. Currently, this is only used in
// testing activities.
syncChan chan string

// onCloseDiscardCapture will be populated by a DB instance during the
// process of performing the Close operation. Currently, we only consider
// using this during testing.
onCloseDiscardCapture map[uint64]uint64
}

// withSyncChan returns a new Options value with syncChan set to the given value.
// If not specified, any operations that would otherwise occur with the syncChan
// will be silently skipped.
func (opt Options) withSyncChan(ch chan string) Options {
opt.syncChan = ch
return opt
}

// withOnCloseDiscardCapture makes a shallow copy of the map c to
// opt.onCloseDiscardCapture. When we later perform DB.Close(), we make sure to
// copy the contents of the DB.discardStats to the map c.
func (opt Options) withOnCloseDiscardCapture(c map[uint64]uint64) Options {
opt.onCloseDiscardCapture = c
return opt
}

// testOnlyDBExtensions specifies an extension to the type DB that we want to
// use only in the context of testing.
type testOnlyDBExtensions struct {
syncChan chan string
}

// setSyncChan is a trivial setter for db.testOnlyDbExtensions.syncChan.
// Strictly speaking, this has little value for us, except that it
// can isolate the test-specific behaviors of a production Badger system
// to this single file.
func (db *DB) setSyncChan(ch chan string) {
db.syncChan = ch
}

// logToSyncChan sends a message to the DB's syncChan. Note that we expect
// that the DB never closes this channel; the responsibility for
// allocating and closing the channel belongs to the test module.
// if db.syncChan is nil or has never been initialized, ths will be
// silently ignored.
func (db *DB) logToSyncChan(msg string) {
if db.syncChan != nil {
db.syncChan <- msg
}
}

// captureDiscardStats will copy the contents of the discardStats file
// maintained by vlog to the onCloseDiscardCapture map specified by
// db.opt. Of couse, if db.opt.onCloseDiscardCapture is nil (as expected
// for a production system as opposed to a test system), this is a no-op.
func (db *DB) captureDiscardStats() {
if db.opt.onCloseDiscardCapture != nil {
db.vlog.discardStats.Lock()
db.vlog.discardStats.Iterate(func(id, val uint64) {
db.opt.onCloseDiscardCapture[id] = val
})
db.vlog.discardStats.Unlock()
}
}
58 changes: 58 additions & 0 deletions test_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

import (
"testing"
"time"
)

// waitForMessage(ch, expected, count, timeout, t) will block until either
// `timeout` seconds have occurred or `count` instances of the string `expected`
// have occurred on the channel `ch`. We log messages or generate errors using `t`.
func waitForMessage(ch chan string, expected string, count int, timeout int, t *testing.T) {
if count <= 0 {
t.Logf("Will skip waiting for %s since exected count <= 0.",
expected)
return
}
tout := time.NewTimer(time.Duration(timeout) * time.Second)
remaining := count
for {
select {
case curMsg, ok := <-ch:
if !ok {
t.Errorf("Test channel closed while waiting for "+
"message %s with %d remaining instances expected",
expected, remaining)
return
}
t.Logf("Found message: %s", curMsg)
if curMsg == expected {
remaining--
if remaining == 0 {
return
}
}
case <-tout.C:
t.Errorf("Timed out after %d seconds while waiting on test chan "+
"for message '%s' with %d remaining instances expected",
timeout, expected, remaining)
return
}
}
}
6 changes: 6 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ func (vlog *valueLog) init(db *DB) {
lf, err := InitDiscardStats(vlog.opt)
y.Check(err)
vlog.discardStats = lf
// See TestPersistLFDiscardStats for purpose of statement below.
db.logToSyncChan("End: vlog.init(db)")
}

func (vlog *valueLog) open(db *DB) error {
Expand Down Expand Up @@ -640,6 +642,7 @@ func (vlog *valueLog) Close() error {
}
}
if vlog.discardStats != nil {
vlog.db.captureDiscardStats()
if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
err = terr
}
Expand Down Expand Up @@ -1103,6 +1106,9 @@ func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
for fid, discard := range stats {
vlog.discardStats.Update(fid, discard)
}
// The following is to coordinate with some test cases where we want to
// verify that at least one iteration of updateDiscardStats has been completed.
vlog.db.logToSyncChan("updateDiscardStats iteration done")
}

type vlogThreshold struct {
Expand Down
16 changes: 8 additions & 8 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"reflect"
"sync"
"testing"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -496,6 +495,10 @@ func TestPersistLFDiscardStats(t *testing.T) {
opt.CompactL0OnClose = false
opt.MemTableSize = 1 << 15
opt.ValueThreshold = 1 << 10
tChan := make(chan string, 100)
defer close(tChan)
opt = opt.withSyncChan(tChan)
opt = opt.withOnCloseDiscardCapture(make(map[uint64]uint64))

db, err := Open(opt)
require.NoError(t, err)
Expand All @@ -522,14 +525,11 @@ func TestPersistLFDiscardStats(t *testing.T) {
require.NoError(t, err)
}

time.Sleep(2 * time.Second) // wait for compaction to complete
// Wait for invocation of updateDiscardStats at least once -- timeout after 60 seconds.
waitForMessage(tChan, "updateDiscardStats iteration done", 1, 60, t)

persistedMap := make(map[uint64]uint64)
db.vlog.discardStats.Lock()
require.True(t, db.vlog.discardStats.Len() > 1, "some discardStats should be generated")
db.vlog.discardStats.Iterate(func(fid, val uint64) {
persistedMap[fid] = val
})

db.vlog.discardStats.Unlock()
require.NoError(t, db.Close())
Expand All @@ -539,13 +539,13 @@ func TestPersistLFDiscardStats(t *testing.T) {
db, err = Open(opt)
require.NoError(t, err)
defer db.Close()
time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats().
waitForMessage(tChan, "End: vlog.init(db)", 1, 60, t)
db.vlog.discardStats.Lock()
statsMap := make(map[uint64]uint64)
db.vlog.discardStats.Iterate(func(fid, val uint64) {
statsMap[fid] = val
})
require.True(t, reflect.DeepEqual(persistedMap, statsMap), "Discard maps are not equal")
require.True(t, reflect.DeepEqual(opt.onCloseDiscardCapture, statsMap), "Discard maps are not equal")
db.vlog.discardStats.Unlock()
}

Expand Down