Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IntermediateRoot: add flag for threshold to update concurrently #453

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ var (
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
utils.DBEngineFlag,
utils.ConcurrentUpdateThresholdFlag,
}

rpcFlags = []cli.Flag{
Expand Down
26 changes: 19 additions & 7 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,13 @@ var (
Usage: "List of mock bls public keys which are reflect 1:1 with mock.validators",
Category: flags.MockCategory,
}

ConcurrentUpdateThresholdFlag = &cli.IntFlag{
Name: "concurrent-update-threshold",
Usage: "The threshold of concurrent update",
Value: 0, // disable concurrent update by default
Category: flags.EthCategory,
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down Expand Up @@ -2029,6 +2036,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.Bool(MonitorFinalityVoteFlag.Name) {
cfg.EnableMonitorFinalityVote = true
}
// Set concurrent update threshold
if ctx.IsSet(ConcurrentUpdateThresholdFlag.Name) {
cfg.ConcurrentUpdateThreshold = ctx.Int(ConcurrentUpdateThresholdFlag.Name)
}
}

// SetDNSDiscoveryDefaults configures DNS discovery with the given URL if
Expand Down Expand Up @@ -2250,13 +2261,14 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
cache := &core.CacheConfig{
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive",
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
Preimages: ctx.Bool(CachePreimagesFlag.Name),
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive",
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
Preimages: ctx.Bool(CachePreimagesFlag.Name),
ConcurrentUpdateThreshold: ctx.Int(ConcurrentUpdateThresholdFlag.Name),
}
if cache.TrieDirtyDisabled && !cache.Preimages {
cache.Preimages = true
Expand Down
4 changes: 4 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type CacheConfig struct {
TriesInMemory int // The number of tries is kept in memory before pruning

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// defaultCacheConfig are the default caching values if none are specified by the
Expand Down Expand Up @@ -1814,6 +1817,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb.ConcurrentUpdateThreshold = bc.cacheConfig.ConcurrentUpdateThreshold
if err != nil {
return it.index, err
}
Expand Down
55 changes: 45 additions & 10 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *stateObject) finalise(prefetch bool) {

// updateTrie writes cached storage modifications into the object's storage trie.
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) Trie {
func (s *stateObject) updateTrie(db Database, skipTrieUpdate bool) Trie {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false) // Don't prefetch anymore, pull directly if need be
if len(s.pendingStorage) == 0 {
Expand All @@ -332,25 +332,32 @@ func (s *stateObject) updateTrie(db Database) Trie {
// The snapshot storage map for the object
var storage map[common.Hash][]byte
// Insert all the pending updates into the trie
tr := s.getTrie(db)
var tr Trie
if !skipTrieUpdate {
tr = s.getTrie(db)
}
hasher := s.db.hasher

usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
continue
}
s.originStorage[key] = value

if !skipTrieUpdate {
s.originStorage[key] = value
}
var v []byte
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
if !skipTrieUpdate {
s.setError(tr.TryDelete(key[:]))
}
s.db.StorageDeleted += 1
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
if !skipTrieUpdate {
s.setError(tr.TryUpdate(key[:], v))
}
s.db.StorageUpdated += 1
}
// If state snapshotting is active, cache the data til commit
Expand All @@ -369,16 +376,44 @@ func (s *stateObject) updateTrie(db Database) Trie {
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.data.Root, usedStorage)
}
if !skipTrieUpdate {
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
}
}
return tr
}

func (s *stateObject) updateTrieConcurrent(db Database) {
tr := s.getTrie(db)
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
continue
}
s.originStorage[key] = value

var v []byte

if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
} else {
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
}
}
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
}
return tr
if tr != nil {
s.data.Root = s.trie.Hash()
}
}

// UpdateRoot sets the trie root to the current root hash of
func (s *stateObject) updateRoot(db Database) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if s.updateTrie(db, false) == nil {
return
}
// Track the amount of time wasted on hashing the storage trie
Expand All @@ -392,7 +427,7 @@ func (s *stateObject) updateRoot(db Database) {
// This updates the trie root.
func (s *stateObject) CommitTrie(db Database) (int, error) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if s.updateTrie(db, false) == nil {
return 0, nil
}
if s.dbErr != nil {
Expand Down
54 changes: 53 additions & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -122,6 +124,9 @@ type StateDB struct {
StorageUpdated int
AccountDeleted int
StorageDeleted int

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// New creates a new state from a given trie.
Expand Down Expand Up @@ -358,7 +363,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie {
return nil
}
cpy := stateObject.deepCopy(s)
cpy.updateTrie(s.db)
cpy.updateTrie(s.db, false)
return cpy.getTrie(s.db)
}

Expand Down Expand Up @@ -855,11 +860,58 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// to pull useful data from disk.

// Get the stateObjects needed to be updated
updateObjs := []*stateObject{}
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
updateObjs = append(updateObjs, obj)
}
}

if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 {
// Update the state trie sequentially
for _, obj := range updateObjs {
obj.updateRoot(s.db)
}
} else {
// Update the state trie concurrently
for _, obj := range updateObjs {
obj.updateTrie(s.db, true)
}

nGoroutines := runtime.NumCPU()
if nGoroutines > len(updateObjs) {
nGoroutines = len(updateObjs)
}

if nGoroutines != 0 {
nObjectsPerRoutine := len(updateObjs) / nGoroutines
nObjectsRemaining := len(updateObjs) % nGoroutines
wg := sync.WaitGroup{}
wg.Add(nGoroutines)
i := 0
for {
nObjects := nObjectsPerRoutine
if nObjectsRemaining > 0 {
nObjects++
nObjectsRemaining--
}
go func(objs []*stateObject) {
defer wg.Done()
for _, obj := range objs {
obj.updateTrieConcurrent(s.db)
}
}(updateObjs[i : i+nObjects])
i += nObjects
if i == len(updateObjs) {
break
}
}
wg.Wait()
}
}

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
Expand Down
39 changes: 39 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"testing"
"testing/quick"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -915,3 +916,41 @@ func TestStateDBAccessList(t *testing.T) {
t.Fatalf("expected empty, got %d", got)
}
}

func TestIntermediateUpdateConcurrently(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().Unix()))
// Create an empty state
db1 := rawdb.NewMemoryDatabase()
db2 := rawdb.NewMemoryDatabase()
state1, _ := New(common.Hash{}, NewDatabase(db1), nil)
state2, _ := New(common.Hash{}, NewDatabase(db2), nil)

// Update it with random data
for i := int64(0); i < 1000; i++ {
addr := common.BigToAddress(big.NewInt(i))
balance := big.NewInt(int64(rng.Int63()))
nonce := rng.Uint64()
key := common.BigToHash(big.NewInt(int64(rng.Int63())))
value := common.BigToHash(big.NewInt(int64(rng.Int63())))
code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())}
state1.SetBalance(addr, balance)
state2.SetBalance(addr, balance)
state1.SetNonce(addr, nonce)
state2.SetNonce(addr, nonce)
state1.SetState(addr, key, value)
state2.SetState(addr, key, value)
state1.SetCode(addr, code)
state2.SetCode(addr, code)
}

state1.ConcurrentUpdateThreshold = 0
state2.ConcurrentUpdateThreshold = 1

root1 := state1.IntermediateRoot(false) // sequential
root2 := state2.IntermediateRoot(false) // concurrent

if root1 != root2 {
t.Fatalf("intermediate roots mismatch: %v != %v", root1.Hex(), root2.Hex())
}

}
21 changes: 11 additions & 10 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnablePreimageRecording: config.EnablePreimageRecording,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
TriesInMemory: config.TriesInMemory,
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
TriesInMemory: config.TriesInMemory,
ConcurrentUpdateThreshold: config.ConcurrentUpdateThreshold,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit)
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ type Config struct {

// Send additional chain event
EnableAdditionalChainEvent bool

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
Loading