Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): pruning manager #20430

Merged
merged 12 commits into from
May 29, 2024
17 changes: 15 additions & 2 deletions store/v2/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

var _ commitment.Tree = (*IavlTree)(nil)
var (
_ commitment.Tree = (*IavlTree)(nil)
_ store.PausablePruner = (*IavlTree)(nil)
)

// IavlTree is a wrapper around iavl.MutableTree.
type IavlTree struct {
Expand All @@ -21,7 +25,7 @@ type IavlTree struct {

// NewIavlTree creates a new IavlTree instance.
func NewIavlTree(db corestore.KVStoreWithBatch, logger log.Logger, cfg *Config) *IavlTree {
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger)
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger, iavl.AsyncPruningOption(true))
return &IavlTree{
tree: tree,
}
Expand Down Expand Up @@ -98,6 +102,15 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}

// PausePruning pauses the pruning process.
func (t *IavlTree) PausePruning(pause bool) {
if pause {
t.tree.SetCommitting()
} else {
t.tree.UnsetCommitting()
}
}

// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))
Expand Down
16 changes: 11 additions & 5 deletions store/v2/commitment/iavl/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package iavl

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) {
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger)
return commitment.NewCommitStore(multiTrees, db, logger)
},
}

Expand Down Expand Up @@ -100,8 +100,14 @@ func TestIavlTree(t *testing.T) {
err = tree.Prune(1)
require.NoError(t, err)
require.Equal(t, uint64(3), tree.GetLatestVersion())
err = tree.LoadVersion(1)
require.Error(t, err)
// async pruning check
checkErr := func() bool {
if _, err := tree.tree.LoadVersion(1); err != nil {
return true
}
return false
}
require.Eventually(t, checkErr, 2*time.Second, 100*time.Millisecond)

// load version 2
err = tree.LoadVersion(2)
Expand Down
36 changes: 16 additions & 20 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
_ store.PausablePruner = (*CommitStore)(nil)
)

// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
Expand All @@ -39,26 +40,18 @@ type CommitStore struct {
logger log.Logger
db corestore.KVStoreWithBatch
multiTrees map[string]Tree

// pruneOptions is the pruning configuration.
pruneOptions *store.PruneOptions // TODO are there no default prune options?
}

// NewCommitStore creates a new CommitStore instance.
func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) {
if pruneOpts == nil {
pruneOpts = store.DefaultPruneOptions()
}

func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, logger log.Logger) (*CommitStore, error) {
return &CommitStore{
logger: logger,
db: db,
multiTrees: trees,
pruneOptions: pruneOpts,
logger: logger,
db: db,
multiTrees: trees,
}, nil
}

func (c *CommitStore) WriteBatch(cs *corestore.Changeset) error {
func (c *CommitStore) WriteChangeset(cs *corestore.Changeset) error {
for _, pairs := range cs.Changes {

key := conv.UnsafeBytesToStr(pairs.Actor)
Expand Down Expand Up @@ -237,13 +230,6 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) {
return nil, err
}

// Prune the old versions.
if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune {
if err := c.Prune(pruneVersion); err != nil {
c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err)
}
}

return cInfo, nil
}

Expand Down Expand Up @@ -297,6 +283,7 @@ func (c *CommitStore) Get(storeKey []byte, version uint64, key []byte) ([]byte,
return bz, nil
}

// Prune implements store.Pruner.
func (c *CommitStore) Prune(version uint64) (ferr error) {
// prune the metadata
batch := c.db.NewBatch()
Expand All @@ -322,6 +309,15 @@ func (c *CommitStore) Prune(version uint64) (ferr error) {
return ferr
}

// PausePruning implements store.PausablePruner.
func (c *CommitStore) PausePruning(pause bool) {
for _, tree := range c.multiTrees {
if pruner, ok := tree.(store.PausablePruner); ok {
pruner.PausePruning(pause)
}
}
Dismissed Show dismissed Hide dismissed
}

// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
Expand Down
17 changes: 11 additions & 6 deletions store/v2/commitment/store_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ const (
type CommitStoreTestSuite struct {
suite.Suite

NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error)
NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*CommitStore, error)
}

func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
storeKeys := []string{storeKey1, storeKey2}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(10)
Expand All @@ -45,7 +45,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))

_, err = commitStore.Commit(i)
s.Require().NoError(err)
Expand All @@ -64,7 +64,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
},
}

targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
KeepRecent: 10,
Interval: 5,
}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(100)
Expand All @@ -144,10 +144,15 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))

_, err = commitStore.Commit(i)
s.Require().NoError(err)

if prune, pruneVersion := pruneOpts.ShouldPrune(i); prune {
s.Require().NoError(commitStore.Prune(pruneVersion))
}

}

pruneVersion := latestVersion - pruneOpts.KeepRecent - 1
Expand Down
14 changes: 2 additions & 12 deletions store/v2/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@ type VersionedDatabase interface {

ApplyChangeset(version uint64, cs *corestore.Changeset) error

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
}

// Committer defines an API for committing state.
type Committer interface {
// WriteBatch writes a batch of key-value pairs to the tree.
WriteBatch(cs *corestore.Changeset) error
// WriteChangeset writes the changeset to the commitment state.
WriteChangeset(cs *corestore.Changeset) error

// WorkingCommitInfo returns the CommitInfo for the working tree.
WorkingCommitInfo(version uint64) *proof.CommitInfo
Expand Down Expand Up @@ -62,11 +57,6 @@ type Committer interface {
// GetCommitInfo returns the CommitInfo for the given version.
GetCommitInfo(version uint64) (*proof.CommitInfo, error)

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
Expand Down
2 changes: 1 addition & 1 deletion store/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
cosmossdk.io/log v1.3.1
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/gogoproto v1.4.12
github.com/cosmos/iavl v1.1.4
github.com/cosmos/iavl v1.2.0
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3
Expand Down
4 changes: 2 additions & 2 deletions store/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK
github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA=
github.com/cosmos/gogoproto v1.4.12 h1:vB6Lbe/rtnYGjQuFxkPiPYiCybqFT8QvLipDZP8JpFE=
github.com/cosmos/gogoproto v1.4.12/go.mod h1:LnZob1bXRdUoqMMtwYlcR3wjiElmlC+FkjaZRv1/eLY=
github.com/cosmos/iavl v1.1.4 h1:Z0cVVjeQqOUp78/nWt/uhQy83vYluWlAMGQ4zbH9G34=
github.com/cosmos/iavl v1.1.4/go.mod h1:vCYmRQUJU1wwj0oRD3wMEtOM9sJNDP+GFMaXmIxZ/rU=
github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM=
github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
2 changes: 1 addition & 1 deletion store/v2/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *Manager) Sync() error {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}
if m.stateCommitment != nil {
if err := m.stateCommitment.WriteBatch(cs); err != nil {
if err := m.stateCommitment.WriteChangeset(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions store/v2/migration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)

snapshotsStore, err := snapshots.NewStore(t.TempDir())
Expand All @@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm

storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2
newStorageStore := storage.NewStorageStore(storageDB, log.NewNopLogger()) // for store/v2

db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
Expand All @@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
Expand All @@ -71,7 +71,7 @@ func TestMigrateState(t *testing.T) {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
require.NoError(t, orgCommitStore.WriteChangeset(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}
Expand Down
52 changes: 52 additions & 0 deletions store/v2/pruning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Pruning Manager

The `pruning` package defines the `PruningManager` struct which is responsible for
pruning the state storage (SS) and the state commitment (SC) based on the current
height of the chain. The `PruneOptions` struct defines the configuration for pruning
and is passed to the `PruningManager` during initialization.

## Prune Options

The `PruneOptions` struct includes the following fields:

- `KeepRecent` (uint64): The number of recent heights to keep in the state.
- `Interval` (uint64): The interval of how often to prune the state. 0 means no pruning.

## Pausable Pruner

The `PausablePruner` interface defines the `PausePruning` method which is used to pause
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
the pruning process. The `PruningManager` will check if the pruner is a `PausablePruner`
and call the `PausePruning` method before and after `Commit` to pause and resume pruning.
This is useful for when the pruning process is asynchronous and needs to be paused during
a commit to prevent parallel writes.

## Pruning Flow

```mermaid
sequenceDiagram
autonumber

participant A as RootStore
participant B as PruningManager
participant C as CommitmentStore
participant D as StorageStore

loop Commit
A->>B: SignalCommit(true, height)
alt SC is PausablePruner
B->>C: PausePruning(true)
else SS is PausablePruner
B->>D: PausePruing(true)
end
A->>C: Commit Changeset
A->>D: Write Changeset
A->>B: SignalCommit(false, height)
alt SC is PausablePruner
B->>C: PausePruning(false)
else SS is PausablePruner
B->>D: PausePruing(false)
end
B->>C: Prune(height)
B->>D: Prune(height)
end
```
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading