Skip to content

Commit

Permalink
feat(store/v2): pruning manager (#20430)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored May 29, 2024
1 parent c7dc915 commit b8c8482
Show file tree
Hide file tree
Showing 23 changed files with 436 additions and 134 deletions.
17 changes: 15 additions & 2 deletions store/v2/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

var _ commitment.Tree = (*IavlTree)(nil)
var (
_ commitment.Tree = (*IavlTree)(nil)
_ store.PausablePruner = (*IavlTree)(nil)
)

// IavlTree is a wrapper around iavl.MutableTree.
type IavlTree struct {
Expand All @@ -21,7 +25,7 @@ type IavlTree struct {

// NewIavlTree creates a new IavlTree instance.
func NewIavlTree(db corestore.KVStoreWithBatch, logger log.Logger, cfg *Config) *IavlTree {
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger)
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger, iavl.AsyncPruningOption(true))
return &IavlTree{
tree: tree,
}
Expand Down Expand Up @@ -98,6 +102,15 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}

// PausePruning pauses the pruning process.
func (t *IavlTree) PausePruning(pause bool) {
if pause {
t.tree.SetCommitting()
} else {
t.tree.UnsetCommitting()
}
}

// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))
Expand Down
16 changes: 11 additions & 5 deletions store/v2/commitment/iavl/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package iavl

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) {
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger)
return commitment.NewCommitStore(multiTrees, db, logger)
},
}

Expand Down Expand Up @@ -100,8 +100,14 @@ func TestIavlTree(t *testing.T) {
err = tree.Prune(1)
require.NoError(t, err)
require.Equal(t, uint64(3), tree.GetLatestVersion())
err = tree.LoadVersion(1)
require.Error(t, err)
// async pruning check
checkErr := func() bool {
if _, err := tree.tree.LoadVersion(1); err != nil {
return true
}
return false
}
require.Eventually(t, checkErr, 2*time.Second, 100*time.Millisecond)

// load version 2
err = tree.LoadVersion(2)
Expand Down
36 changes: 16 additions & 20 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
_ store.PausablePruner = (*CommitStore)(nil)
)

// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
Expand All @@ -39,26 +40,18 @@ type CommitStore struct {
logger log.Logger
db corestore.KVStoreWithBatch
multiTrees map[string]Tree

// pruneOptions is the pruning configuration.
pruneOptions *store.PruneOptions // TODO are there no default prune options?
}

// NewCommitStore creates a new CommitStore instance.
func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) {
if pruneOpts == nil {
pruneOpts = store.DefaultPruneOptions()
}

func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, logger log.Logger) (*CommitStore, error) {
return &CommitStore{
logger: logger,
db: db,
multiTrees: trees,
pruneOptions: pruneOpts,
logger: logger,
db: db,
multiTrees: trees,
}, nil
}

func (c *CommitStore) WriteBatch(cs *corestore.Changeset) error {
func (c *CommitStore) WriteChangeset(cs *corestore.Changeset) error {
for _, pairs := range cs.Changes {

key := conv.UnsafeBytesToStr(pairs.Actor)
Expand Down Expand Up @@ -237,13 +230,6 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) {
return nil, err
}

// Prune the old versions.
if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune {
if err := c.Prune(pruneVersion); err != nil {
c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err)
}
}

return cInfo, nil
}

Expand Down Expand Up @@ -297,6 +283,7 @@ func (c *CommitStore) Get(storeKey []byte, version uint64, key []byte) ([]byte,
return bz, nil
}

// Prune implements store.Pruner.
func (c *CommitStore) Prune(version uint64) (ferr error) {
// prune the metadata
batch := c.db.NewBatch()
Expand All @@ -322,6 +309,15 @@ func (c *CommitStore) Prune(version uint64) (ferr error) {
return ferr
}

// PausePruning implements store.PausablePruner.
func (c *CommitStore) PausePruning(pause bool) {
for _, tree := range c.multiTrees {
if pruner, ok := tree.(store.PausablePruner); ok {
pruner.PausePruning(pause)
}
}
}

// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
Expand Down
17 changes: 11 additions & 6 deletions store/v2/commitment/store_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ const (
type CommitStoreTestSuite struct {
suite.Suite

NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error)
NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*CommitStore, error)
}

func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
storeKeys := []string{storeKey1, storeKey2}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(10)
Expand All @@ -45,7 +45,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))

_, err = commitStore.Commit(i)
s.Require().NoError(err)
Expand All @@ -64,7 +64,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
},
}

targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
KeepRecent: 10,
Interval: 5,
}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(100)
Expand All @@ -144,10 +144,15 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))

_, err = commitStore.Commit(i)
s.Require().NoError(err)

if prune, pruneVersion := pruneOpts.ShouldPrune(i); prune {
s.Require().NoError(commitStore.Prune(pruneVersion))
}

}

pruneVersion := latestVersion - pruneOpts.KeepRecent - 1
Expand Down
14 changes: 2 additions & 12 deletions store/v2/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@ type VersionedDatabase interface {

ApplyChangeset(version uint64, cs *corestore.Changeset) error

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
}

// Committer defines an API for committing state.
type Committer interface {
// WriteBatch writes a batch of key-value pairs to the tree.
WriteBatch(cs *corestore.Changeset) error
// WriteChangeset writes the changeset to the commitment state.
WriteChangeset(cs *corestore.Changeset) error

// WorkingCommitInfo returns the CommitInfo for the working tree.
WorkingCommitInfo(version uint64) *proof.CommitInfo
Expand Down Expand Up @@ -62,11 +57,6 @@ type Committer interface {
// GetCommitInfo returns the CommitInfo for the given version.
GetCommitInfo(version uint64) (*proof.CommitInfo, error)

// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
Expand Down
2 changes: 1 addition & 1 deletion store/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
cosmossdk.io/log v1.3.1
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/gogoproto v1.4.12
github.com/cosmos/iavl v1.1.4
github.com/cosmos/iavl v1.2.0
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3
Expand Down
4 changes: 2 additions & 2 deletions store/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK
github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA=
github.com/cosmos/gogoproto v1.4.12 h1:vB6Lbe/rtnYGjQuFxkPiPYiCybqFT8QvLipDZP8JpFE=
github.com/cosmos/gogoproto v1.4.12/go.mod h1:LnZob1bXRdUoqMMtwYlcR3wjiElmlC+FkjaZRv1/eLY=
github.com/cosmos/iavl v1.1.4 h1:Z0cVVjeQqOUp78/nWt/uhQy83vYluWlAMGQ4zbH9G34=
github.com/cosmos/iavl v1.1.4/go.mod h1:vCYmRQUJU1wwj0oRD3wMEtOM9sJNDP+GFMaXmIxZ/rU=
github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM=
github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
2 changes: 1 addition & 1 deletion store/v2/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (m *Manager) Sync() error {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}
if m.stateCommitment != nil {
if err := m.stateCommitment.WriteBatch(cs); err != nil {
if err := m.stateCommitment.WriteChangeset(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions store/v2/migration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)

snapshotsStore, err := snapshots.NewStore(t.TempDir())
Expand All @@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm

storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2
newStorageStore := storage.NewStorageStore(storageDB, log.NewNopLogger()) // for store/v2

db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
Expand All @@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
Expand All @@ -71,7 +71,7 @@ func TestMigrateState(t *testing.T) {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
require.NoError(t, orgCommitStore.WriteChangeset(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}
Expand Down
52 changes: 52 additions & 0 deletions store/v2/pruning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Pruning Manager

The `pruning` package defines the `PruningManager` struct which is responsible for
pruning the state storage (SS) and the state commitment (SC) based on the current
height of the chain. The `PruneOptions` struct defines the configuration for pruning
and is passed to the `PruningManager` during initialization.

## Prune Options

The `PruneOptions` struct includes the following fields:

* `KeepRecent` (uint64): The number of recent heights to keep in the state.
* `Interval` (uint64): The interval of how often to prune the state. 0 means no pruning.

## Pausable Pruner

The `PausablePruner` interface defines the `PausePruning` method, which is used to pause
the pruning process. The `PruningManager` will check if the pruner is a `PausablePruner`
and call the `PausePruning` method before and after `Commit` to pause and resume pruning.
This is useful for when the pruning process is asynchronous and needs to be paused during
a commit to prevent parallel writes.

## Pruning Flow

```mermaid
sequenceDiagram
autonumber
participant A as RootStore
participant B as PruningManager
participant C as CommitmentStore
participant D as StorageStore
loop Commit
A->>B: SignalCommit(true, height)
alt SC is PausablePruner
B->>C: PausePruning(true)
else SS is PausablePruner
B->>D: PausePruing(true)
end
A->>C: Commit Changeset
A->>D: Write Changeset
A->>B: SignalCommit(false, height)
alt SC is PausablePruner
B->>C: PausePruning(false)
else SS is PausablePruner
B->>D: PausePruing(false)
end
B->>C: Prune(height)
B->>D: Prune(height)
end
```
Loading

0 comments on commit b8c8482

Please sign in to comment.