Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): pruning manager #20430

Merged
merged 12 commits into from
May 29, 2024
17 changes: 15 additions & 2 deletions store/v2/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

var _ commitment.Tree = (*IavlTree)(nil)
var (
_ commitment.Tree = (*IavlTree)(nil)
_ store.PausablePruner = (*IavlTree)(nil)
)

// IavlTree is a wrapper around iavl.MutableTree.
type IavlTree struct {
Expand All @@ -21,7 +25,7 @@ type IavlTree struct {

// NewIavlTree creates a new IavlTree instance.
func NewIavlTree(db corestore.KVStoreWithBatch, logger log.Logger, cfg *Config) *IavlTree {
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger)
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger, iavl.AsyncPruningOption(true))
return &IavlTree{
tree: tree,
}
Expand Down Expand Up @@ -98,6 +102,15 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}

// PausePruning pauses the pruning process.
func (t *IavlTree) PausePruning(pause bool) {
if pause {
t.tree.SetCommitting()
} else {
t.tree.UnsetCommitting()
}
}

// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))
Expand Down
16 changes: 11 additions & 5 deletions store/v2/commitment/iavl/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package iavl

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) {
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger)
return commitment.NewCommitStore(multiTrees, db, logger)
},
}

Expand Down Expand Up @@ -100,8 +100,14 @@ func TestIavlTree(t *testing.T) {
err = tree.Prune(1)
require.NoError(t, err)
require.Equal(t, uint64(3), tree.GetLatestVersion())
err = tree.LoadVersion(1)
require.Error(t, err)
// async pruning check
checkErr := func() bool {
if _, err := tree.tree.LoadVersion(1); err != nil {
return true
}
return false
}
require.Eventually(t, checkErr, 2*time.Second, 100*time.Millisecond)

// load version 2
err = tree.LoadVersion(2)
Expand Down
36 changes: 16 additions & 20 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
_ store.PausablePruner = (*CommitStore)(nil)
)

// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
Expand All @@ -39,26 +40,18 @@
logger log.Logger
db corestore.KVStoreWithBatch
multiTrees map[string]Tree

// pruneOptions is the pruning configuration.
pruneOptions *store.PruneOptions // TODO are there no default prune options?
}

// NewCommitStore creates a new CommitStore instance.
func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) {
if pruneOpts == nil {
pruneOpts = store.DefaultPruneOptions()
}

func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, logger log.Logger) (*CommitStore, error) {
return &CommitStore{
logger: logger,
db: db,
multiTrees: trees,
pruneOptions: pruneOpts,
logger: logger,
db: db,
multiTrees: trees,
}, nil
}

func (c *CommitStore) WriteBatch(cs *corestore.Changeset) error {
func (c *CommitStore) WriteChangeset(cs *corestore.Changeset) error {
for _, pairs := range cs.Changes {

key := conv.UnsafeBytesToStr(pairs.Actor)
Expand Down Expand Up @@ -237,13 +230,6 @@
return nil, err
}

// Prune the old versions.
if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune {
if err := c.Prune(pruneVersion); err != nil {
c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err)
}
}

return cInfo, nil
}

Expand Down Expand Up @@ -297,6 +283,7 @@
return bz, nil
}

// Prune implements store.Pruner.
func (c *CommitStore) Prune(version uint64) (ferr error) {
// prune the metadata
batch := c.db.NewBatch()
Expand All @@ -322,6 +309,15 @@
return ferr
}

// PausePruning implements store.PausablePruner.
func (c *CommitStore) PausePruning(pause bool) {
for _, tree := range c.multiTrees {
if pruner, ok := tree.(store.PausablePruner); ok {
pruner.PausePruning(pause)
}
}
Dismissed Show dismissed Hide dismissed
}

// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
Expand Down
17 changes: 11 additions & 6 deletions store/v2/commitment/store_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ const (
type CommitStoreTestSuite struct {
suite.Suite

NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error)
NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*CommitStore, error)
}

func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
storeKeys := []string{storeKey1, storeKey2}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(10)
Expand All @@ -45,7 +45,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))

_, err = commitStore.Commit(i)
s.Require().NoError(err)
Expand All @@ -64,7 +64,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
},
}

targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
KeepRecent: 10,
Interval: 5,
}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(100)
Expand All @@ -144,10 +144,15 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))

_, err = commitStore.Commit(i)
s.Require().NoError(err)

if prune, pruneVersion := pruneOpts.ShouldPrune(i); prune {
s.Require().NoError(commitStore.Prune(pruneVersion))
}

}

pruneVersion := latestVersion - pruneOpts.KeepRecent - 1
Expand Down
14 changes: 2 additions & 12 deletions store/v2/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@ type VersionedDatabase interface {

ApplyChangeset(version uint64, cs *corestore.Changeset) error

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
}

// Committer defines an API for committing state.
type Committer interface {
// WriteBatch writes a batch of key-value pairs to the tree.
WriteBatch(cs *corestore.Changeset) error
// WriteChangeset writes the changeset to the commitment state.
WriteChangeset(cs *corestore.Changeset) error

// WorkingCommitInfo returns the CommitInfo for the working tree.
WorkingCommitInfo(version uint64) *proof.CommitInfo
Expand Down Expand Up @@ -62,11 +57,6 @@ type Committer interface {
// GetCommitInfo returns the CommitInfo for the given version.
GetCommitInfo(version uint64) (*proof.CommitInfo, error)

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
Expand Down
2 changes: 1 addition & 1 deletion store/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
cosmossdk.io/log v1.3.1
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/gogoproto v1.4.12
github.com/cosmos/iavl v1.1.4
github.com/cosmos/iavl v1.2.0
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3
Expand Down
4 changes: 2 additions & 2 deletions store/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK
github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA=
github.com/cosmos/gogoproto v1.4.12 h1:vB6Lbe/rtnYGjQuFxkPiPYiCybqFT8QvLipDZP8JpFE=
github.com/cosmos/gogoproto v1.4.12/go.mod h1:LnZob1bXRdUoqMMtwYlcR3wjiElmlC+FkjaZRv1/eLY=
github.com/cosmos/iavl v1.1.4 h1:Z0cVVjeQqOUp78/nWt/uhQy83vYluWlAMGQ4zbH9G34=
github.com/cosmos/iavl v1.1.4/go.mod h1:vCYmRQUJU1wwj0oRD3wMEtOM9sJNDP+GFMaXmIxZ/rU=
github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM=
github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
2 changes: 1 addition & 1 deletion store/v2/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *Manager) Sync() error {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}
if m.stateCommitment != nil {
if err := m.stateCommitment.WriteBatch(cs); err != nil {
if err := m.stateCommitment.WriteChangeset(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions store/v2/migration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)

snapshotsStore, err := snapshots.NewStore(t.TempDir())
Expand All @@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm

storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2
newStorageStore := storage.NewStorageStore(storageDB, log.NewNopLogger()) // for store/v2

db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
Expand All @@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
Expand All @@ -71,7 +71,7 @@ func TestMigrateState(t *testing.T) {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
require.NoError(t, orgCommitStore.WriteChangeset(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}
Expand Down
69 changes: 69 additions & 0 deletions store/v2/pruning/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package pruning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a small readme/doc.go on the design with a small diagram on how this is meant to be used


import "cosmossdk.io/store/v2"

// Manager is a struct that manages the pruning of old versions of the SC and SS.
type Manager struct {
// scPruner is the pruner for the SC.
scPruner store.Pruner
// scPruningOptions are the pruning options for the SC.
scPruningOptions *store.PruneOptions
// ssPruner is the pruner for the SS.
ssPruner store.Pruner
// ssPruningOptions are the pruning options for the SS.
ssPruningOptions *store.PruneOptions
}

// NewManager creates a new Pruning Manager.
func NewManager(scPruner, ssPruner store.Pruner, scPruningOptions, ssPruningOptions *store.PruneOptions) *Manager {
return &Manager{
scPruner: scPruner,
scPruningOptions: scPruningOptions,
ssPruner: ssPruner,
ssPruningOptions: ssPruningOptions,
}
}

// Prune prunes the SC and SS to the provided version.
//
// NOTE: It can be called outside of the store manually.
func (m *Manager) Prune(version uint64) error {
// Prune the SC.
if m.scPruningOptions != nil {
if prune, pruneTo := m.scPruningOptions.ShouldPrune(version); prune {
if err := m.scPruner.Prune(pruneTo); err != nil {
return err
}
}
}

// Prune the SS.
if m.ssPruningOptions != nil {
if prune, pruneTo := m.ssPruningOptions.ShouldPrune(version); prune {
if err := m.ssPruner.Prune(pruneTo); err != nil {
return err
}
}
}

return nil
}

// SignalCommit signals to the manager that a commit has started or finished.
// It is used to trigger the pruning of the SC and SS.
// It pauses or resumes the pruning of the SC and SS if the pruner implements
// the PausablePruner interface.
func (m *Manager) SignalCommit(start bool, version uint64) error {
if scPausablePruner, ok := m.scPruner.(store.PausablePruner); ok {
scPausablePruner.PausePruning(start)
}
if ssPausablePruner, ok := m.ssPruner.(store.PausablePruner); ok {
ssPausablePruner.PausePruning(start)
}

if !start {
return m.Prune(version)
}

return nil
}
Loading
Loading