Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions store/iavlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,6 @@ func (s *IAVLStore) Prune() error {
return nil
}

func (s *IAVLStore) GetSnapshot() Snapshot {
// This isn't an actual snapshot obviously, and never will be, but lets pretend...
return &iavlStoreSnapshot{
IAVLStore: s,
}
}

func (s *IAVLStore) GetSnapshotAt(version int64) (Snapshot, error) {
panic("not implemented")
}
Expand Down
4 changes: 0 additions & 4 deletions store/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ func (s *LogStore) Prune() error {
return s.store.Prune()
}

func (s *LogStore) GetSnapshot() Snapshot {
return s.store.GetSnapshot()
}

func (s *LogStore) GetSnapshotAt(version int64) (Snapshot, error) {
return s.store.GetSnapshotAt(version)
}
4 changes: 0 additions & 4 deletions store/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ func (m *MemStore) Prune() error {
return nil
}

func (m *MemStore) GetSnapshot() Snapshot {
panic("not implemented")
}

func (m *MemStore) GetSnapshotAt(version int64) (Snapshot, error) {
panic("not implemented")
}
10 changes: 1 addition & 9 deletions store/multi_writer_app_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func init() {
Namespace: "loomchain",
Subsystem: "multi_writer_appstore",
Name: "get_snapshot",
Help: "How long MultiWriterAppStore.GetSnapshot() took to execute (in seconds)",
Help: "How long MultiWriterAppStore.GetSnapshotAt() took to execute (in seconds)",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{},
)
Expand Down Expand Up @@ -272,14 +272,6 @@ func (s *MultiWriterAppStore) Prune() error {
return s.appStore.Prune()
}

func (s *MultiWriterAppStore) GetSnapshot() Snapshot {
snapshot, err := s.GetSnapshotAt(0)
if err != nil {
panic(err)
}
return snapshot
}

func (s *MultiWriterAppStore) GetSnapshotAt(version int64) (Snapshot, error) {
defer func(begin time.Time) {
getSnapshotDuration.Observe(time.Since(begin).Seconds())
Expand Down
42 changes: 40 additions & 2 deletions store/multi_writer_app_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (m *MultiWriterAppStoreTestSuite) TestMultiWriterAppStoreSnapshotFlushInter
store.Set([]byte("test2"), []byte("test2v2"))

// this snapshot is from memory
snapshotv1 := store.GetSnapshot()
snapshotv1, err := store.GetSnapshotAt(0)
require.NoError(err)
require.Equal([]byte("test1"), snapshotv1.Get([]byte("test1")))
require.Equal([]byte("test2"), snapshotv1.Get([]byte("test2")))

Expand All @@ -119,7 +120,8 @@ func (m *MultiWriterAppStoreTestSuite) TestMultiWriterAppStoreSnapshotFlushInter
require.NoError(err)

// get snapshotv2
snapshotv2 := store.GetSnapshot()
snapshotv2, err := store.GetSnapshotAt(0)
require.NoError(err)
require.Equal([]byte("test1v2"), snapshotv2.Get([]byte("test1")))
require.Equal([]byte("test2v2"), snapshotv2.Get([]byte("test2")))

Expand Down Expand Up @@ -252,6 +254,42 @@ func (m *MultiWriterAppStoreTestSuite) TestIAVLRangeWithlimit() {
require.Equal(4, len(rangeData))
}

func (m *MultiWriterAppStoreTestSuite) TestStoreRange() {
require := m.Require()
mws, err := mockMultiWriterStore(0, 0)
require.NoError(err)
prefixes, entries := populateStore(mws)
verifyRange(require, "MultiWriterAppStore", mws, prefixes, entries)
_, _, err = mws.SaveVersion()
require.NoError(err)
verifyRange(require, "MultiWriterAppStore", mws, prefixes, entries)
}

func (m *MultiWriterAppStoreTestSuite) TestSnapshotRange() {
require := m.Require()
mws, err := mockMultiWriterStore(0, 0)
require.NoError(err)
prefixes, entries := populateStore(mws)
verifyRange(require, "MultiWriterAppStore", mws, prefixes, entries)
mws.SaveVersion()

// snapshot should see all the data that was saved to disk
func() {
snap, err := mws.GetSnapshotAt(0)
require.NoError(err)
defer snap.Release()

verifyRange(require, "MultiWriterAppStoreSnapshot", snap, prefixes, entries)
}()
}

func (m *MultiWriterAppStoreTestSuite) TestConcurrentSnapshots() {
require := m.Require()
mws, err := mockMultiWriterStore(0, 0)
require.NoError(err)
verifyConcurrentSnapshots(require, mws)
}

func mockMultiWriterStore(appStoreFlushInterval, evmStoreFlushInterval int64) (*MultiWriterAppStore, error) {
// Using different flush intervals for the app & evm stores is not supported.
if appStoreFlushInterval > 0 && evmStoreFlushInterval > 0 && appStoreFlushInterval != evmStoreFlushInterval {
Expand Down
1 change: 0 additions & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type VersionedKVStore interface {
SaveVersion() ([]byte, int64, error)
// Delete old version of the store
Prune() error
GetSnapshot() Snapshot
GetSnapshotAt(version int64) (Snapshot, error)
}

Expand Down
77 changes: 23 additions & 54 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ type StoreTestSuite struct {
suite.Suite
store VersionedKVStore
StoreName string
//nolint:unused,structcheck
supportsSnapshots bool
}

func populateStore(s KVWriter) ([][]byte, []*plugin.RangeEntry) {
Expand Down Expand Up @@ -219,8 +217,9 @@ func populateStore(s KVWriter) ([][]byte, []*plugin.RangeEntry) {
return prefixes, entries
}

func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []*plugin.RangeEntry) {
require := ts.Require()
func verifyRange(
require *require.Assertions, storeName string, s KVReader, prefixes [][]byte, entries []*plugin.RangeEntry,
) {
// TODO: This passed before the last Tendermint upgrade, doesn't anymore, figure out why.
/*
expected := []*plugin.RangeEntry{
Expand All @@ -237,8 +236,8 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []*
}
*/
require.Len(s.Range([]byte("abc123")), 1)
require.EqualValues([]byte{}, s.Range([]byte("abc123"))[0].Key, ts.StoreName)
require.EqualValues(entries[1].Value, s.Range([]byte("abc123"))[0].Value, ts.StoreName)
require.EqualValues([]byte{}, s.Range([]byte("abc123"))[0].Key, storeName)
require.EqualValues(entries[1].Value, s.Range([]byte("abc123"))[0].Value, storeName)

key2, err := util.UnprefixKey(entries[2].Key, prefixes[0])
require.NoError(err)
Expand All @@ -253,10 +252,10 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []*
{key4, entries[4].Value},
}
actual := s.Range(prefixes[0])
require.Len(actual, len(expected), ts.StoreName)
if ts.StoreName != "MemStore" {
require.Len(actual, len(expected), storeName)
if storeName != "MemStore" {
for i := range expected {
require.EqualValues(expected[i], actual[i], ts.StoreName)
require.EqualValues(expected[i], actual[i], storeName)
}
}

Expand All @@ -275,12 +274,12 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []*
{key8, entries[8].Value},
}
actual = s.Range(prefixes[1])
require.Len(actual, len(expected), ts.StoreName)
require.Len(actual, len(expected), storeName)

// TODO: MemStore keys should be iterated in ascending order
if ts.StoreName != "MemStore" {
if storeName != "MemStore" {
for i := range expected {
require.EqualValues(expected[i], actual[i], ts.StoreName)
require.EqualValues(expected[i], actual[i], storeName)
}
}

Expand All @@ -295,25 +294,24 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []*
{key10, entries[10].Value},
}
actual = s.Range(prefixes[2])
require.Len(actual, len(expected), ts.StoreName)
if ts.StoreName != "MemStore" {
require.Len(actual, len(expected), storeName)
if storeName != "MemStore" {
for i := range expected {
require.EqualValues(expected[i], actual[i], ts.StoreName)
require.EqualValues(expected[i], actual[i], storeName)
}
}
}

func (ts *StoreTestSuite) TestStoreRange() {
require := ts.Require()
prefixes, entries := populateStore(ts.store)
ts.VerifyRange(ts.store, prefixes, entries)
verifyRange(require, ts.StoreName, ts.store, prefixes, entries)
_, _, err := ts.store.SaveVersion()
require.NoError(err)
ts.VerifyRange(ts.store, prefixes, entries)
verifyRange(require, ts.StoreName, ts.store, prefixes, entries)
}

func (ts *StoreTestSuite) VerifyConcurrentSnapshots() {
require := ts.Require()
func verifyConcurrentSnapshots(require *require.Assertions, s VersionedKVStore) {
// start one writer go-routine and a bunch of reader go-routines
var wg sync.WaitGroup
numOps := 10000
Expand All @@ -324,13 +322,13 @@ func (ts *StoreTestSuite) VerifyConcurrentSnapshots() {
defer wg.Done()

for i := 0; i < numOps; i++ {
ts.store.Set([]byte(fmt.Sprintf("key/%d", i)), []byte(fmt.Sprintf("value/%d", i)))
s.Set([]byte(fmt.Sprintf("key/%d", i)), []byte(fmt.Sprintf("value/%d", i)))
if i%10 == 0 {
_, _, err := ts.store.SaveVersion()
_, _, err := s.SaveVersion()
require.NoError(err)
}
}
_, _, err := ts.store.SaveVersion()
_, _, err := s.SaveVersion()
require.NoError(err)
}()
wg.Wait()
Expand All @@ -347,7 +345,9 @@ func (ts *StoreTestSuite) VerifyConcurrentSnapshots() {
if snap != nil {
snap.Release()
}
snap = ts.store.GetSnapshot()
var err error
snap, err = s.GetSnapshotAt(0)
require.NoError(err)
}
snap.Get([]byte(fmt.Sprintf("key/%d", i)))
}
Expand All @@ -374,7 +374,6 @@ type IAVLStoreTestSuite struct {

func (ts *IAVLStoreTestSuite) SetupSuite() {
ts.StoreName = "IAVLStore"
ts.supportsSnapshots = true
}

// runs before each test in this suite
Expand All @@ -386,35 +385,6 @@ func (ts *IAVLStoreTestSuite) SetupTest() {
require.NoError(err)
}

func (ts *IAVLStoreTestSuite) TestSnapshotRange() {
prefixes, entries := populateStore(ts.store)
ts.VerifyRange(ts.store, prefixes, entries)

// snapshot shouldn't see data that hasn't been saved to disk,
// but this store doesn't have real snapshots so the snapshot is expected to contain the same
// unsaved state as the store itself...
func() {
snap := ts.store.GetSnapshot()
defer snap.Release()

ts.VerifyRange(snap, prefixes, entries)
}()

ts.store.SaveVersion()

// snapshot should see all the data that was saved to disk
func() {
snap := ts.store.GetSnapshot()
defer snap.Release()

ts.VerifyRange(snap, prefixes, entries)
}()
}

func (ts *IAVLStoreTestSuite) TestConcurrentSnapshots() {
ts.VerifyConcurrentSnapshots()
}

//
// MemStore - broken in various ways, dunno why we even have this.
//
Expand All @@ -434,7 +404,6 @@ func (ts *MemStoreTestSuite) SetupTest() {

func (ts *MemStoreTestSuite) SetupSuite() {
ts.StoreName = "MemStore"
ts.supportsSnapshots = false
}

func TestIAVLStoreKeepsAllVersionsIfMaxVersionsIsZero(t *testing.T) {
Expand Down
25 changes: 7 additions & 18 deletions store/versioned_cachingstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/allegro/bigcache"
Expand Down Expand Up @@ -362,35 +363,23 @@ func (c *versionedCachingStore) Set(key, val []byte) {
func (c *versionedCachingStore) SaveVersion() ([]byte, int64, error) {
hash, version, err := c.VersionedKVStore.SaveVersion()
if err == nil {
// Cache version is always 1 block ahead of KV store version, that way when
// GetSnapshot() is called it won't return the current unpersisted state of the cache,
// but rather the last persisted version.
c.version = version + 1
if err = c.cache.Set(rootKey, GetEVMRootFromAppStore(c.VersionedKVStore), version); err != nil {
// Only log error and dont error out
cacheErrors.With("cache_operation", "set").Add(1)
c.logger.Error("[VersionedCachingStore] error while caching EVM root", "err", err)
}
// Cache version is always 1 block ahead of KV store version, that way when
// GetSnapshotAt(0) is called it won't return the current unpersisted state of the cache,
// but rather the last persisted version.
// GetSnapshotAt may be called concurrently so the version must be updated atomically.
atomic.StoreInt64(&c.version, version+1)
}
return hash, version, err
}

func (c *versionedCachingStore) GetSnapshot() Snapshot {
snapshot, err := c.GetSnapshotAt(0)
if err != nil {
panic(err)
}
return snapshot
}

func (c *versionedCachingStore) GetSnapshotAt(version int64) (Snapshot, error) {
// TODO: c.version & c.VersionedKVStore.GetSnapshot() could end up corresponding to different
// versions, need to do this atomically.
if version == 0 {
return newVersionedCachingStoreSnapshot(
c.VersionedKVStore.GetSnapshot(),
c.cache, c.version-1, c.logger,
), nil
version = atomic.LoadInt64(&c.version) - 1
}

snapshot, err := c.VersionedKVStore.GetSnapshotAt(version)
Expand Down
Loading