Skip to content

Commit

Permalink
IntermediateRoot: add flag for threshold to update concurrently
Browse files Browse the repository at this point in the history
Divide the root updating of stateObjects into goroutines if number of stateObjects is at least the threshold
statedb_test.go/TestIntermediateUpdateConcurrently: add test to check if the states after processed with both options are identical
  • Loading branch information
Francesco4203 committed May 21, 2024
1 parent 9e55df7 commit b0f24b0
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 10 deletions.
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,13 @@ var (
Usage: "List of mock bls public keys which are reflect 1:1 with mock.validators",
Category: flags.MockCategory,
}

ConcurrentUpdateThresholdFlag = &cli.IntFlag{
Name: "concurrencyupdatethreashold",
Usage: "The threshold of concurrent update",
Value: 0, // disable concurrent update by default
Category: flags.EthCategory,
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down
4 changes: 4 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type CacheConfig struct {
TriesInMemory int // The number of tries is kept in memory before pruning

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// defaultCacheConfig are the default caching values if none are specified by the
Expand Down Expand Up @@ -1814,6 +1817,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb.ConcurrentUpdateThreshold = bc.cacheConfig.ConcurrentUpdateThreshold
if err != nil {
return it.index, err
}
Expand Down
40 changes: 40 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -122,6 +124,9 @@ type StateDB struct {
StorageUpdated int
AccountDeleted int
StorageDeleted int

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// New creates a new state from a given trie.
Expand Down Expand Up @@ -855,11 +860,46 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// to pull useful data from disk.

// Get the stateObjects needed to be updated
updateObjs := []*stateObject{}
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
updateObjs = append(updateObjs, obj)
}
}

if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 {
// Update the state objects sequentially
for _, obj := range updateObjs {
obj.updateRoot(s.db)
}
} else {
// Declare min function
min := func(a, b int) int {
if a < b {
return a
}
return b
}
// Update the state objects using goroutines, with maximum of NumCPU goroutines
nRoutines := min(runtime.NumCPU(), len(updateObjs))
if nRoutines != 0 {
nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines
wg := sync.WaitGroup{}
wg.Add(nRoutines)
for i := 0; i < len(updateObjs); i += nObjPerRoutine {
go func(objs []*stateObject) {
defer wg.Done()
for _, obj := range objs {
obj.updateRoot(s.db)
}
}(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))])
}
wg.Wait()
}
}

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
Expand Down
76 changes: 76 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"testing"
"testing/quick"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -915,3 +916,78 @@ func TestStateDBAccessList(t *testing.T) {
t.Fatalf("expected empty, got %d", got)
}
}

func TestIntermediateUpdateConcurrently(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().Unix()))
// Create an empty state
db1 := rawdb.NewMemoryDatabase()
db2 := rawdb.NewMemoryDatabase()
state1, _ := New(common.Hash{}, NewDatabase(db1), nil)
state2, _ := New(common.Hash{}, NewDatabase(db2), nil)

// Update it with random data
for i := int64(0); i < 1000; i++ {
addr := common.BigToAddress(big.NewInt(i))
balance := big.NewInt(int64(rng.Int63()))
nonce := rng.Uint64()
key := common.BigToHash(big.NewInt(int64(rng.Int63())))
value := common.BigToHash(big.NewInt(int64(rng.Int63())))
code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())}
state1.SetBalance(addr, balance)
state2.SetBalance(addr, balance)
state1.SetNonce(addr, nonce)
state2.SetNonce(addr, nonce)
state1.SetState(addr, key, value)
state2.SetState(addr, key, value)
state1.SetCode(addr, code)
state2.SetCode(addr, code)
}

state1.ConcurrentUpdateThreshold = 0
state2.ConcurrentUpdateThreshold = 1

state1.IntermediateRoot(false) // sequential
state2.IntermediateRoot(false) // concurrent

root1, err1 := state1.Commit(false)
root2, err2 := state2.Commit(false)

if err1 != nil {
t.Fatalf("sequential commit failed: %v", err1)
}
if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil {
t.Errorf("cannot commit trie %v to persistent database", root1.Hex())
}
if err2 != nil {
t.Fatalf("concurrent commit failed: %v", err2)
}
if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil {
t.Errorf("cannot commit trie %v to persistent database", root2.Hex())
}

it1 := db1.NewIterator(nil, nil)
it2 := db2.NewIterator(nil, nil)
for it1.Next() {
if !it2.Next() {
t.Fatalf("concurrent iterator ended prematurely")
}
if !bytes.Equal(it1.Key(), it2.Key()) {
t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key()))
}
if !bytes.Equal(it1.Value(), it2.Value()) {
t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value()))
}
}
if it1.Error() != nil {
t.Fatalf("sequential iterator error: %v", it1.Error())
}
if it2.Error() != nil {
t.Fatalf("concurrent iterator error: %v", it2.Error())
}
if it1.Next() {
t.Fatalf("sequential iterator has extra data")
}
if it2.Next() {
t.Fatalf("concurrent iterator has extra data")
}
}
21 changes: 11 additions & 10 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnablePreimageRecording: config.EnablePreimageRecording,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
TriesInMemory: config.TriesInMemory,
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
TriesInMemory: config.TriesInMemory,
ConcurrentUpdateThreshold: config.ConcurrentUpdateThreshold,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit)
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ type Config struct {

// Send additional chain event
EnableAdditionalChainEvent bool

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down

0 comments on commit b0f24b0

Please sign in to comment.