From b0f24b0e232fefa8feea896c19e1343f31ca82a1 Mon Sep 17 00:00:00 2001 From: Francesco4203 Date: Tue, 21 May 2024 10:37:39 +0700 Subject: [PATCH] IntermediateRoot: add flag for threshold to update concurrently Divide the root updating of stateObjects into goroutines if number of stateObjects is at least the threshold statedb_test.go/TestIntermediateUpdateConcurrently: add test to check if the states after processed with both options are identical --- cmd/utils/flags.go | 7 ++++ core/blockchain.go | 4 ++ core/state/statedb.go | 40 ++++++++++++++++++++ core/state/statedb_test.go | 76 ++++++++++++++++++++++++++++++++++++++ eth/backend.go | 21 ++++++----- eth/ethconfig/config.go | 3 ++ 6 files changed, 141 insertions(+), 10 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d13682d0c3..6e0f5eb579 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1071,6 +1071,13 @@ var ( Usage: "List of mock bls public keys which are reflect 1:1 with mock.validators", Category: flags.MockCategory, } + + ConcurrentUpdateThresholdFlag = &cli.IntFlag{ + Name: "concurrencyupdatethreashold", + Usage: "The threshold of concurrent update", + Value: 0, // disable concurrent update by default + Category: flags.EthCategory, + } ) // MakeDataDir retrieves the currently requested data directory, terminating diff --git a/core/blockchain.go b/core/blockchain.go index 9d123fc9e8..17d066cfe7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -139,6 +139,9 @@ type CacheConfig struct { TriesInMemory int // The number of tries is kept in memory before pruning SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // defaultCacheConfig are the default caching values if none are specified by the @@ -1814,6 +1817,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb.ConcurrentUpdateThreshold = bc.cacheConfig.ConcurrentUpdateThreshold if err != nil { return it.index, err } diff --git a/core/state/statedb.go b/core/state/statedb.go index 014e8a0891..227a7a2d3c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,7 +21,9 @@ import ( "errors" "fmt" "math/big" + "runtime" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -122,6 +124,9 @@ type StateDB struct { StorageUpdated int AccountDeleted int StorageDeleted int + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // New creates a new state from a given trie. @@ -855,11 +860,46 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // the account prefetcher. Instead, let's process all the storage updates // first, giving the account prefeches just a few more milliseconds of time // to pull useful data from disk. + + // Get the stateObjects needed to be updated + updateObjs := []*stateObject{} for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { + updateObjs = append(updateObjs, obj) + } + } + + if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { + // Update the state objects sequentially + for _, obj := range updateObjs { obj.updateRoot(s.db) } + } else { + // Declare min function + min := func(a, b int) int { + if a < b { + return a + } + return b + } + // Update the state objects using goroutines, with maximum of NumCPU goroutines + nRoutines := min(runtime.NumCPU(), len(updateObjs)) + if nRoutines != 0 { + nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines + wg := sync.WaitGroup{} + wg.Add(nRoutines) + for i := 0; i < len(updateObjs); i += nObjPerRoutine { + go func(objs []*stateObject) { + defer wg.Done() + for _, obj := range objs { + obj.updateRoot(s.db) + } + }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) + } + wg.Wait() + } } + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index e9576d4dc4..69afe80e08 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -28,6 +28,7 @@ import ( "sync" "testing" "testing/quick" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -915,3 +916,78 @@ func TestStateDBAccessList(t *testing.T) { t.Fatalf("expected empty, got %d", got) } } + +func TestIntermediateUpdateConcurrently(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().Unix())) + // Create an empty state + db1 := rawdb.NewMemoryDatabase() + db2 := rawdb.NewMemoryDatabase() + state1, _ := New(common.Hash{}, NewDatabase(db1), nil) + state2, _ := New(common.Hash{}, NewDatabase(db2), nil) + + // Update it with random data + for i := int64(0); i < 1000; i++ { + addr := common.BigToAddress(big.NewInt(i)) + balance := big.NewInt(int64(rng.Int63())) + nonce := rng.Uint64() + key := common.BigToHash(big.NewInt(int64(rng.Int63()))) + value := common.BigToHash(big.NewInt(int64(rng.Int63()))) + code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())} + state1.SetBalance(addr, balance) + state2.SetBalance(addr, balance) + state1.SetNonce(addr, nonce) + state2.SetNonce(addr, nonce) + state1.SetState(addr, key, value) + state2.SetState(addr, key, value) + state1.SetCode(addr, code) + state2.SetCode(addr, code) + } + + state1.ConcurrentUpdateThreshold = 0 + state2.ConcurrentUpdateThreshold = 1 + + state1.IntermediateRoot(false) // sequential + state2.IntermediateRoot(false) // concurrent + + root1, err1 := state1.Commit(false) + root2, err2 := state2.Commit(false) + + if err1 != nil { + t.Fatalf("sequential commit failed: %v", err1) + } + if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil { + t.Errorf("cannot commit trie %v to persistent database", root1.Hex()) + } + if err2 != nil { + t.Fatalf("concurrent commit failed: %v", err2) + } + if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil { + t.Errorf("cannot commit trie %v to persistent database", root2.Hex()) + } + + it1 := db1.NewIterator(nil, nil) + it2 := db2.NewIterator(nil, nil) + for it1.Next() { + if !it2.Next() { + t.Fatalf("concurrent iterator ended prematurely") + } + if !bytes.Equal(it1.Key(), it2.Key()) { + t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key())) + } + if !bytes.Equal(it1.Value(), it2.Value()) { + t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value())) + } + } + if it1.Error() != nil { + t.Fatalf("sequential iterator error: %v", it1.Error()) + } + if it2.Error() != nil { + t.Fatalf("concurrent iterator error: %v", it2.Error()) + } + if it1.Next() { + t.Fatalf("sequential iterator has extra data") + } + if it2.Next() { + t.Fatalf("concurrent iterator has extra data") + } +} diff --git a/eth/backend.go b/eth/backend.go index ed1132bcc2..a6f4bc371a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -187,16 +187,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EnablePreimageRecording: config.EnablePreimageRecording, } cacheConfig = &core.CacheConfig{ - TrieCleanLimit: config.TrieCleanCache, - TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal), - TrieCleanRejournal: config.TrieCleanCacheRejournal, - TrieCleanNoPrefetch: config.NoPrefetch, - TrieDirtyLimit: config.TrieDirtyCache, - TrieDirtyDisabled: config.NoPruning, - TrieTimeLimit: config.TrieTimeout, - SnapshotLimit: config.SnapshotCache, - Preimages: config.Preimages, - TriesInMemory: config.TriesInMemory, + TrieCleanLimit: config.TrieCleanCache, + TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal), + TrieCleanRejournal: config.TrieCleanCacheRejournal, + TrieCleanNoPrefetch: config.NoPrefetch, + TrieDirtyLimit: config.TrieDirtyCache, + TrieDirtyDisabled: config.NoPruning, + TrieTimeLimit: config.TrieTimeout, + SnapshotLimit: config.SnapshotCache, + Preimages: config.Preimages, + TriesInMemory: config.TriesInMemory, + ConcurrentUpdateThreshold: config.ConcurrentUpdateThreshold, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index b9a999e62b..e486f915b0 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -222,6 +222,9 @@ type Config struct { // Send additional chain event EnableAdditionalChainEvent bool + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // CreateConsensusEngine creates a consensus engine for the given chain configuration.