From 291633021b939a76107333e41fabc917f58fcabf Mon Sep 17 00:00:00 2001 From: Vadim Macagon Date: Wed, 29 Jan 2020 16:47:15 +0700 Subject: [PATCH] Eliminate VersionedKVStore.GetSnapshot() This is a follow up to PR #1532, cleans up a couple of issues left over from that PR: - Eliminate GetSnapshot() from all the stores, only GetSnapshotAt() is used now. - Fix VersionedCachingStore.GetSnapshotAt() to be atomic. Also cleaned up the tests a little bit so they test using real stores instead of buggy mock stores. --- store/iavlstore.go | 7 -- store/logstore.go | 4 - store/memstore.go | 4 - store/multi_writer_app_store.go | 10 +- store/multi_writer_app_store_test.go | 42 ++++++- store/store.go | 1 - store/store_test.go | 77 ++++-------- store/versioned_cachingstore.go | 25 ++-- store/versioned_cachingstore_test.go | 170 ++++++++------------------- 9 files changed, 121 insertions(+), 219 deletions(-) diff --git a/store/iavlstore.go b/store/iavlstore.go index 01b79db71f..78810dd311 100755 --- a/store/iavlstore.go +++ b/store/iavlstore.go @@ -209,13 +209,6 @@ func (s *IAVLStore) Prune() error { return nil } -func (s *IAVLStore) GetSnapshot() Snapshot { - // This isn't an actual snapshot obviously, and never will be, but lets pretend... - return &iavlStoreSnapshot{ - IAVLStore: s, - } -} - func (s *IAVLStore) GetSnapshotAt(version int64) (Snapshot, error) { panic("not implemented") } diff --git a/store/logstore.go b/store/logstore.go index 4de507b068..545ae3a123 100644 --- a/store/logstore.go +++ b/store/logstore.go @@ -126,10 +126,6 @@ func (s *LogStore) Prune() error { return s.store.Prune() } -func (s *LogStore) GetSnapshot() Snapshot { - return s.store.GetSnapshot() -} - func (s *LogStore) GetSnapshotAt(version int64) (Snapshot, error) { return s.store.GetSnapshotAt(version) } diff --git a/store/memstore.go b/store/memstore.go index dfe89e143c..371277500b 100644 --- a/store/memstore.go +++ b/store/memstore.go @@ -76,10 +76,6 @@ func (m *MemStore) Prune() error { return nil } -func (m *MemStore) GetSnapshot() Snapshot { - panic("not implemented") -} - func (m *MemStore) GetSnapshotAt(version int64) (Snapshot, error) { panic("not implemented") } diff --git a/store/multi_writer_app_store.go b/store/multi_writer_app_store.go index a731a9bfee..ab2c545bae 100644 --- a/store/multi_writer_app_store.go +++ b/store/multi_writer_app_store.go @@ -56,7 +56,7 @@ func init() { Namespace: "loomchain", Subsystem: "multi_writer_appstore", Name: "get_snapshot", - Help: "How long MultiWriterAppStore.GetSnapshot() took to execute (in seconds)", + Help: "How long MultiWriterAppStore.GetSnapshotAt() took to execute (in seconds)", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, []string{}, ) @@ -272,14 +272,6 @@ func (s *MultiWriterAppStore) Prune() error { return s.appStore.Prune() } -func (s *MultiWriterAppStore) GetSnapshot() Snapshot { - snapshot, err := s.GetSnapshotAt(0) - if err != nil { - panic(err) - } - return snapshot -} - func (s *MultiWriterAppStore) GetSnapshotAt(version int64) (Snapshot, error) { defer func(begin time.Time) { getSnapshotDuration.Observe(time.Since(begin).Seconds()) diff --git a/store/multi_writer_app_store_test.go b/store/multi_writer_app_store_test.go index 1e3e9e6a2d..e7d5802c72 100644 --- a/store/multi_writer_app_store_test.go +++ b/store/multi_writer_app_store_test.go @@ -110,7 +110,8 @@ func (m *MultiWriterAppStoreTestSuite) TestMultiWriterAppStoreSnapshotFlushInter store.Set([]byte("test2"), []byte("test2v2")) // this snapshot is from memory - snapshotv1 := store.GetSnapshot() + snapshotv1, err := store.GetSnapshotAt(0) + require.NoError(err) require.Equal([]byte("test1"), snapshotv1.Get([]byte("test1"))) require.Equal([]byte("test2"), snapshotv1.Get([]byte("test2"))) @@ -119,7 +120,8 @@ func (m *MultiWriterAppStoreTestSuite) TestMultiWriterAppStoreSnapshotFlushInter require.NoError(err) // get snapshotv2 - snapshotv2 := store.GetSnapshot() + snapshotv2, err := store.GetSnapshotAt(0) + require.NoError(err) require.Equal([]byte("test1v2"), snapshotv2.Get([]byte("test1"))) require.Equal([]byte("test2v2"), snapshotv2.Get([]byte("test2"))) @@ -252,6 +254,42 @@ func (m *MultiWriterAppStoreTestSuite) TestIAVLRangeWithlimit() { require.Equal(4, len(rangeData)) } +func (m *MultiWriterAppStoreTestSuite) TestStoreRange() { + require := m.Require() + mws, err := mockMultiWriterStore(0, 0) + require.NoError(err) + prefixes, entries := populateStore(mws) + verifyRange(require, "MultiWriterAppStore", mws, prefixes, entries) + _, _, err = mws.SaveVersion() + require.NoError(err) + verifyRange(require, "MultiWriterAppStore", mws, prefixes, entries) +} + +func (m *MultiWriterAppStoreTestSuite) TestSnapshotRange() { + require := m.Require() + mws, err := mockMultiWriterStore(0, 0) + require.NoError(err) + prefixes, entries := populateStore(mws) + verifyRange(require, "MultiWriterAppStore", mws, prefixes, entries) + mws.SaveVersion() + + // snapshot should see all the data that was saved to disk + func() { + snap, err := mws.GetSnapshotAt(0) + require.NoError(err) + defer snap.Release() + + verifyRange(require, "MultiWriterAppStoreSnapshot", snap, prefixes, entries) + }() +} + +func (m *MultiWriterAppStoreTestSuite) TestConcurrentSnapshots() { + require := m.Require() + mws, err := mockMultiWriterStore(0, 0) + require.NoError(err) + verifyConcurrentSnapshots(require, mws) +} + func mockMultiWriterStore(appStoreFlushInterval, evmStoreFlushInterval int64) (*MultiWriterAppStore, error) { // Using different flush intervals for the app & evm stores is not supported. if appStoreFlushInterval > 0 && evmStoreFlushInterval > 0 && appStoreFlushInterval != evmStoreFlushInterval { diff --git a/store/store.go b/store/store.go index 85e0c7fe72..d8422ac511 100644 --- a/store/store.go +++ b/store/store.go @@ -53,7 +53,6 @@ type VersionedKVStore interface { SaveVersion() ([]byte, int64, error) // Delete old version of the store Prune() error - GetSnapshot() Snapshot GetSnapshotAt(version int64) (Snapshot, error) } diff --git a/store/store_test.go b/store/store_test.go index 98f270bc2f..55a9431caf 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -184,8 +184,6 @@ type StoreTestSuite struct { suite.Suite store VersionedKVStore StoreName string - //nolint:unused,structcheck - supportsSnapshots bool } func populateStore(s KVWriter) ([][]byte, []*plugin.RangeEntry) { @@ -219,8 +217,9 @@ func populateStore(s KVWriter) ([][]byte, []*plugin.RangeEntry) { return prefixes, entries } -func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []*plugin.RangeEntry) { - require := ts.Require() +func verifyRange( + require *require.Assertions, storeName string, s KVReader, prefixes [][]byte, entries []*plugin.RangeEntry, +) { // TODO: This passed before the last Tendermint upgrade, doesn't anymore, figure out why. /* expected := []*plugin.RangeEntry{ @@ -237,8 +236,8 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []* } */ require.Len(s.Range([]byte("abc123")), 1) - require.EqualValues([]byte{}, s.Range([]byte("abc123"))[0].Key, ts.StoreName) - require.EqualValues(entries[1].Value, s.Range([]byte("abc123"))[0].Value, ts.StoreName) + require.EqualValues([]byte{}, s.Range([]byte("abc123"))[0].Key, storeName) + require.EqualValues(entries[1].Value, s.Range([]byte("abc123"))[0].Value, storeName) key2, err := util.UnprefixKey(entries[2].Key, prefixes[0]) require.NoError(err) @@ -253,10 +252,10 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []* {key4, entries[4].Value}, } actual := s.Range(prefixes[0]) - require.Len(actual, len(expected), ts.StoreName) - if ts.StoreName != "MemStore" { + require.Len(actual, len(expected), storeName) + if storeName != "MemStore" { for i := range expected { - require.EqualValues(expected[i], actual[i], ts.StoreName) + require.EqualValues(expected[i], actual[i], storeName) } } @@ -275,12 +274,12 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []* {key8, entries[8].Value}, } actual = s.Range(prefixes[1]) - require.Len(actual, len(expected), ts.StoreName) + require.Len(actual, len(expected), storeName) // TODO: MemStore keys should be iterated in ascending order - if ts.StoreName != "MemStore" { + if storeName != "MemStore" { for i := range expected { - require.EqualValues(expected[i], actual[i], ts.StoreName) + require.EqualValues(expected[i], actual[i], storeName) } } @@ -295,10 +294,10 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []* {key10, entries[10].Value}, } actual = s.Range(prefixes[2]) - require.Len(actual, len(expected), ts.StoreName) - if ts.StoreName != "MemStore" { + require.Len(actual, len(expected), storeName) + if storeName != "MemStore" { for i := range expected { - require.EqualValues(expected[i], actual[i], ts.StoreName) + require.EqualValues(expected[i], actual[i], storeName) } } } @@ -306,14 +305,13 @@ func (ts *StoreTestSuite) VerifyRange(s KVReader, prefixes [][]byte, entries []* func (ts *StoreTestSuite) TestStoreRange() { require := ts.Require() prefixes, entries := populateStore(ts.store) - ts.VerifyRange(ts.store, prefixes, entries) + verifyRange(require, ts.StoreName, ts.store, prefixes, entries) _, _, err := ts.store.SaveVersion() require.NoError(err) - ts.VerifyRange(ts.store, prefixes, entries) + verifyRange(require, ts.StoreName, ts.store, prefixes, entries) } -func (ts *StoreTestSuite) VerifyConcurrentSnapshots() { - require := ts.Require() +func verifyConcurrentSnapshots(require *require.Assertions, s VersionedKVStore) { // start one writer go-routine and a bunch of reader go-routines var wg sync.WaitGroup numOps := 10000 @@ -324,13 +322,13 @@ func (ts *StoreTestSuite) VerifyConcurrentSnapshots() { defer wg.Done() for i := 0; i < numOps; i++ { - ts.store.Set([]byte(fmt.Sprintf("key/%d", i)), []byte(fmt.Sprintf("value/%d", i))) + s.Set([]byte(fmt.Sprintf("key/%d", i)), []byte(fmt.Sprintf("value/%d", i))) if i%10 == 0 { - _, _, err := ts.store.SaveVersion() + _, _, err := s.SaveVersion() require.NoError(err) } } - _, _, err := ts.store.SaveVersion() + _, _, err := s.SaveVersion() require.NoError(err) }() wg.Wait() @@ -347,7 +345,9 @@ func (ts *StoreTestSuite) VerifyConcurrentSnapshots() { if snap != nil { snap.Release() } - snap = ts.store.GetSnapshot() + var err error + snap, err = s.GetSnapshotAt(0) + require.NoError(err) } snap.Get([]byte(fmt.Sprintf("key/%d", i))) } @@ -374,7 +374,6 @@ type IAVLStoreTestSuite struct { func (ts *IAVLStoreTestSuite) SetupSuite() { ts.StoreName = "IAVLStore" - ts.supportsSnapshots = true } // runs before each test in this suite @@ -386,35 +385,6 @@ func (ts *IAVLStoreTestSuite) SetupTest() { require.NoError(err) } -func (ts *IAVLStoreTestSuite) TestSnapshotRange() { - prefixes, entries := populateStore(ts.store) - ts.VerifyRange(ts.store, prefixes, entries) - - // snapshot shouldn't see data that hasn't been saved to disk, - // but this store doesn't have real snapshots so the snapshot is expected to contain the same - // unsaved state as the store itself... - func() { - snap := ts.store.GetSnapshot() - defer snap.Release() - - ts.VerifyRange(snap, prefixes, entries) - }() - - ts.store.SaveVersion() - - // snapshot should see all the data that was saved to disk - func() { - snap := ts.store.GetSnapshot() - defer snap.Release() - - ts.VerifyRange(snap, prefixes, entries) - }() -} - -func (ts *IAVLStoreTestSuite) TestConcurrentSnapshots() { - ts.VerifyConcurrentSnapshots() -} - // // MemStore - broken in various ways, dunno why we even have this. // @@ -434,7 +404,6 @@ func (ts *MemStoreTestSuite) SetupTest() { func (ts *MemStoreTestSuite) SetupSuite() { ts.StoreName = "MemStore" - ts.supportsSnapshots = false } func TestIAVLStoreKeepsAllVersionsIfMaxVersionsIsZero(t *testing.T) { diff --git a/store/versioned_cachingstore.go b/store/versioned_cachingstore.go index 95ec39d1b3..d7ed09b451 100644 --- a/store/versioned_cachingstore.go +++ b/store/versioned_cachingstore.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/allegro/bigcache" @@ -362,35 +363,23 @@ func (c *versionedCachingStore) Set(key, val []byte) { func (c *versionedCachingStore) SaveVersion() ([]byte, int64, error) { hash, version, err := c.VersionedKVStore.SaveVersion() if err == nil { - // Cache version is always 1 block ahead of KV store version, that way when - // GetSnapshot() is called it won't return the current unpersisted state of the cache, - // but rather the last persisted version. - c.version = version + 1 if err = c.cache.Set(rootKey, GetEVMRootFromAppStore(c.VersionedKVStore), version); err != nil { // Only log error and dont error out cacheErrors.With("cache_operation", "set").Add(1) c.logger.Error("[VersionedCachingStore] error while caching EVM root", "err", err) } + // Cache version is always 1 block ahead of KV store version, that way when + // GetSnapshotAt(0) is called it won't return the current unpersisted state of the cache, + // but rather the last persisted version. + // GetSnapshotAt may be called concurrently so the version must be updated atomically. + atomic.StoreInt64(&c.version, version+1) } return hash, version, err } -func (c *versionedCachingStore) GetSnapshot() Snapshot { - snapshot, err := c.GetSnapshotAt(0) - if err != nil { - panic(err) - } - return snapshot -} - func (c *versionedCachingStore) GetSnapshotAt(version int64) (Snapshot, error) { - // TODO: c.version & c.VersionedKVStore.GetSnapshot() could end up corresponding to different - // versions, need to do this atomically. if version == 0 { - return newVersionedCachingStoreSnapshot( - c.VersionedKVStore.GetSnapshot(), - c.cache, c.version-1, c.logger, - ), nil + version = atomic.LoadInt64(&c.version) - 1 } snapshot, err := c.VersionedKVStore.GetSnapshotAt(version) diff --git a/store/versioned_cachingstore_test.go b/store/versioned_cachingstore_test.go index 0315046c25..2239c36561 100644 --- a/store/versioned_cachingstore_test.go +++ b/store/versioned_cachingstore_test.go @@ -3,153 +3,83 @@ package store import ( "testing" - "github.com/loomnetwork/go-loom/plugin" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type MockStore struct { - storage map[string][]byte - version int64 -} - -func NewMockStore() *MockStore { - return &MockStore{ - storage: make(map[string][]byte), - version: 0, - } -} - -func (m *MockStore) Get(key []byte) []byte { - return m.storage[string(key)] -} - -func (m *MockStore) Has(key []byte) bool { - return m.storage[string(key)] != nil -} - -func (m *MockStore) Set(key []byte, value []byte) { - m.storage[string(key)] = value -} - -func (m *MockStore) Delete(key []byte) { - delete(m.storage, string(key)) -} - -func (m *MockStore) Range(prefix []byte) plugin.RangeData { - return nil -} - -func (m *MockStore) Hash() []byte { - return nil -} - -func (m *MockStore) Version() int64 { - return m.version -} - -func (m *MockStore) SaveVersion() ([]byte, int64, error) { - m.version = m.version + 1 - return nil, m.version, nil -} - -func (m *MockStore) Prune() error { - return nil -} - -func (m *MockStore) GetSnapshot() Snapshot { - snapshotStore := make(map[string][]byte) - for k, v := range m.storage { - snapshotStore[k] = v - } - return &mockStoreSnapshot{ - MockStore: &MockStore{ - storage: snapshotStore, - }, - } -} - -func (m *MockStore) GetSnapshotAt(version int64) (Snapshot, error) { - panic("not implemented") -} - -type mockStoreSnapshot struct { - *MockStore -} - -func (s *mockStoreSnapshot) Release() { - // noop -} - func TestCachingStoreVersion(t *testing.T) { defaultConfig := DefaultCachingStoreConfig() defaultConfig.CachingEnabled = true - mockStore := NewMockStore() - - versionedStore, err := NewVersionedCachingStore(mockStore, defaultConfig, mockStore.Version()) - cachingStore := versionedStore.(*versionedCachingStore) - + mockStore, err := mockMultiWriterStore(0, 0) require.NoError(t, err) key1 := []byte("key1") key2 := []byte("key2") key3 := []byte("key3") - mockStore.Set(key1, []byte("value1")) - mockStore.Set(key2, []byte("value2")) - mockStore.Set(key3, []byte("value3")) + versionedStore, err := NewVersionedCachingStore(mockStore, defaultConfig, mockStore.Version()) + require.NoError(t, err) - snapshotv0 := cachingStore.GetSnapshot() + versionedStore.Set(key1, []byte("value1")) + versionedStore.Set(key2, []byte("value2")) + versionedStore.Set(key3, []byte("value3")) + + snapshotv0, err := versionedStore.GetSnapshotAt(0) + require.NoError(t, err) - // cachingStoreSnapshot will cache key1 in memory as version 0 + // snapshot should be empty because values haven't been persisted to the underlying store cachedValue := snapshotv0.Get(key1) - assert.Equal(t, "value1", string(cachedValue), "cachingstore read needs to be consistent with underlying store") - // Set data directly without update the cache, caching store should return old data - mockStore.Set(key2, []byte("value2")) - cachedValue = snapshotv0.Get([]byte("key1")) - assert.Equal(t, "value1", string(cachedValue), "cachingstore need to fetch key directly from the backing store") + assert.Equal(t, "", string(cachedValue), "snapshot should be empty") - // save to bump up version - _, version, _ := cachingStore.SaveVersion() + _, version, _ := versionedStore.SaveVersion() assert.Equal(t, int64(1), version, "version must be updated to 1") - // save data into version 1 - cachingStore.Set(key2, []byte("newvalue2")) - cachingStore.Set(key3, []byte("newvalue3")) - snapshotv1 := cachingStore.GetSnapshot() - cachedValue = snapshotv1.Get(key2) - assert.Equal(t, "newvalue2", string(cachedValue), "snapshotv1 should get correct value") - cachedValue = snapshotv1.Get(key1) - assert.Equal(t, "value1", string(cachedValue), "snapshotv1 should get correct value") - // snapshotv0 should not get updated + // previously obtained snapshot should still be empty since it shouldn't be affected by changes + // to the underlying store cachedValue = snapshotv0.Get(key1) - assert.Equal(t, "value1", string(cachedValue), "snapshotv0 should get correct value") - cachedValue = snapshotv0.Get(key2) - assert.Equal(t, "value2", string(cachedValue), "snapshotv0 should get correct value") - cachedValue = snapshotv0.Get(key3) - assert.Equal(t, "value3", string(cachedValue), "snapshotv0 should get correct value") + assert.Equal(t, "", string(cachedValue), "snapshot should be empty") - cacheSnapshot := snapshotv0.(*versionedCachingStoreSnapshot) - cacheSnapshot.cache.Delete(key1, 1) // evict a key - cachedValue = snapshotv0.Get(key1) // call an evicted key - assert.Equal(t, "value1", string(cachedValue), "snapshotv1 should get correct value, fetching from underlying snapshot") + snapshotv1, err := versionedStore.GetSnapshotAt(0) + require.NoError(t, err) + + // new snapshot should contain the previously persisted values + cachedValue = snapshotv1.Get(key1) + assert.Equal(t, "value1", string(cachedValue), "value should match the one persisted to the underlying store") + // existing snapshot should be unaffected by unpersisted changes to the store + versionedStore.Set(key1, []byte("newvalue1")) + cachedValue = snapshotv1.Get(key1) + assert.Equal(t, "value1", string(cachedValue), "snapshot should not be affected by changes to the underlying store") // save to bump up version - _, version, _ = cachingStore.SaveVersion() + _, version, _ = versionedStore.SaveVersion() assert.Equal(t, int64(2), version, "version must be updated to 2") - snapshotv2 := cachingStore.GetSnapshot() + + // existing snapshot should be unaffected by persisted changes to the store + cachedValue = snapshotv1.Get(key1) + assert.Equal(t, "value1", string(cachedValue), "snapshot should not be affected by changes to the uderlying store") + + // save data into version 3 + versionedStore.Set(key2, []byte("newvalue2")) + versionedStore.Set(key3, []byte("newvalue3")) + + _, version, _ = versionedStore.SaveVersion() + assert.Equal(t, int64(3), version, "version must be updated to 3") + + snapshotv2, err := versionedStore.GetSnapshotAt(0) + require.NoError(t, err) cachedValue = snapshotv2.Get(key1) - assert.Equal(t, "value1", string(cachedValue), "snapshotv2 should get the value from cache") + assert.Equal(t, "newvalue1", string(cachedValue)) cachedValue = snapshotv2.Get(key2) - assert.Equal(t, "newvalue2", string(cachedValue), "snapshotv2 should get the value from cache") + assert.Equal(t, "newvalue2", string(cachedValue)) cachedValue = snapshotv2.Get(key3) - assert.Equal(t, "newvalue3", string(cachedValue), "snapshotv2 should get the value from cache") + assert.Equal(t, "newvalue3", string(cachedValue)) - // evict data from key table - cacheSnapshot = snapshotv1.(*versionedCachingStoreSnapshot) - cacheSnapshot.cache.cache.Delete(string(key1)) // evict a key table - cachedValue = snapshotv2.Get(key1) - assert.Equal(t, "value1", string(cachedValue), "snapshotv2 should get the value from cache") + // snapshotv1 should remain unchanged + cachedValue = snapshotv1.Get(key1) + assert.Equal(t, "value1", string(cachedValue)) + cachedValue = snapshotv1.Get(key2) + assert.Equal(t, "value2", string(cachedValue)) + cachedValue = snapshotv1.Get(key3) + assert.Equal(t, "value3", string(cachedValue)) }