diff --git a/core/block_validator.go b/core/block_validator.go index 008444fbbc45..afc89012f1a1 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -160,6 +160,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD } else if res.Requests != nil { return errors.New("block has requests before prague fork") } + // Validate the state root against the received state root and throw // an error if they don't match. if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { diff --git a/core/state/database.go b/core/state/database.go index b46e5d500d64..93a3bf81ebde 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -91,12 +91,18 @@ type Trie interface { // in the trie with provided address. UpdateAccount(address common.Address, account *types.StateAccount, codeLen int) error + // UpdateAccountBatch attempts to update a list accounts in the batch manner. + UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, _ []int) error + // UpdateStorage associates key with value in the trie. If value has length zero, // any existing value is deleted from the trie. The value bytes must not be modified // by the caller while they are stored in the trie. If a node was not found in the // database, a trie.MissingNodeError is returned. UpdateStorage(addr common.Address, key, value []byte) error + // UpdateStorageBatch attempts to update a list storages in the batch manner. + UpdateStorageBatch(_ common.Address, keys [][]byte, values [][]byte) error + // DeleteAccount abstracts an account deletion from the trie. DeleteAccount(address common.Address) error diff --git a/core/state/state_object.go b/core/state/state_object.go index 767f469bfde5..0269a6217945 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -322,8 +322,10 @@ func (s *stateObject) updateTrie() (Trie, error) { // into a shortnode. This requires `B` to be resolved from disk. // Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved. var ( - deletions []common.Hash - used = make([]common.Hash, 0, len(s.uncommittedStorage)) + deletions []common.Hash + used = make([]common.Hash, 0, len(s.uncommittedStorage)) + updateKeys [][]byte + updateValues [][]byte ) for key, origin := range s.uncommittedStorage { // Skip noop changes, persist actual changes @@ -337,10 +339,8 @@ func (s *stateObject) updateTrie() (Trie, error) { continue } if (value != common.Hash{}) { - if err := tr.UpdateStorage(s.address, key[:], common.TrimLeftZeroes(value[:])); err != nil { - s.db.setError(err) - return nil, err - } + updateKeys = append(updateKeys, key[:]) + updateValues = append(updateValues, common.TrimLeftZeroes(value[:])) s.db.StorageUpdated.Add(1) } else { deletions = append(deletions, key) @@ -348,6 +348,12 @@ func (s *stateObject) updateTrie() (Trie, error) { // Cache the items for preloading used = append(used, key) // Copy needed for closure } + if len(updateKeys) > 0 { + if err := tr.UpdateStorageBatch(common.Address{}, updateKeys, updateValues); err != nil { + s.db.setError(err) + return nil, err + } + } for _, key := range deletions { if err := tr.DeleteStorage(s.address, key[:]); err != nil { s.db.setError(err) diff --git a/core/state/statedb.go b/core/state/statedb.go index efb09a08a02b..e889e27d8ff0 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -574,6 +574,25 @@ func (s *StateDB) updateStateObject(obj *stateObject) { s.trie.UpdateContractCode(obj.Address(), common.BytesToHash(obj.CodeHash()), obj.code) } } +func (s *StateDB) updateStateObjects(objs []*stateObject) { + var addrs []common.Address + var accts []*types.StateAccount + + for _, obj := range objs { + addrs = append(addrs, obj.Address()) + accts = append(accts, &obj.data) + } + + if err := s.trie.UpdateAccountBatch(addrs, accts, nil); err != nil { + s.setError(fmt.Errorf("updateStateObjects error: %v", err)) + } + + for _, obj := range objs { + if obj.dirtyCode { + s.trie.UpdateContractCode(obj.Address(), common.BytesToHash(obj.CodeHash()), obj.code) + } + } +} // deleteStateObject removes the given object from the state trie. func (s *StateDB) deleteStateObject(addr common.Address) { @@ -883,6 +902,31 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { workers.Wait() s.StorageUpdates += time.Since(start) + /* + fmt.Println("begin print mutations") + fmt.Println(s.txIndex) + for addr, _ := range s.mutations { + if _, ok := s.stateObjects[addr]; !ok { + continue + } + if s.stateObjects[addr].trie == nil { + continue + } + fmt.Printf("mut %x/%x:\n", addr, s.stateObjects[addr].addrHash) + it, err := s.stateObjects[addr].trie.NodeIterator([]byte{}) + if err != nil { + panic(err) + } + for it.Next(true) { + if it.Leaf() { + fmt.Printf("%x: %x\n", it.Path(), it.LeafBlob()) + } else { + fmt.Printf("%x: %x\n", it.Path(), it.Hash()) + } + } + } + */ + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. @@ -911,6 +955,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { var ( usedAddrs []common.Address deletedAddrs []common.Address + updatedObjs []*stateObject ) for addr, op := range s.mutations { if op.applied { @@ -921,11 +966,14 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { if op.isDelete() { deletedAddrs = append(deletedAddrs, addr) } else { - s.updateStateObject(s.stateObjects[addr]) + updatedObjs = append(updatedObjs, s.stateObjects[addr]) s.AccountUpdated += 1 } usedAddrs = append(usedAddrs, addr) // Copy needed for closure } + if len(updatedObjs) > 0 { + s.updateStateObjects(updatedObjs) + } for _, deletedAddr := range deletedAddrs { s.deleteStateObject(deletedAddr) s.AccountDeleted += 1 @@ -937,7 +985,6 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } // Track the amount of time wasted on hashing the account trie defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) - hash := s.trie.Hash() // If witness building is enabled, gather the account trie witness diff --git a/tests/block_test.go b/tests/block_test.go index 91d9f2e653a0..82eeb534fc24 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -105,7 +105,7 @@ func execBlockTest(t *testing.T, bt *testMatcher, test *BlockTest) { } for _, snapshot := range snapshotConf { for _, dbscheme := range dbschemeConf { - if err := bt.checkFailure(t, test.Run(snapshot, dbscheme, true, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(snapshot, dbscheme, false, nil, nil)); err != nil { t.Errorf("test with config {snapshotter:%v, scheme:%v} failed: %v", snapshot, dbscheme, err) return } diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 0424ecb6e515..d3ce17cb0c2e 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -190,6 +190,29 @@ func (t *StateTrie) UpdateStorage(_ common.Address, key, value []byte) error { return nil } +// UpdateStorageBatch attempts to update a list storages in the batch manner. +func (t *StateTrie) UpdateStorageBatch(_ common.Address, keys [][]byte, values [][]byte) error { + var ( + hkeys = make([][]byte, 0, len(keys)) + evals = make([][]byte, 0, len(values)) + ) + for _, key := range keys { + hk := crypto.Keccak256(key) + if t.preimages != nil { + t.secKeyCache[common.Hash(hk)] = key + } + hkeys = append(hkeys, hk) + } + for _, val := range values { + data, err := rlp.EncodeToBytes(val) + if err != nil { + return err + } + evals = append(evals, data) + } + return t.trie.UpdateBatch(hkeys, evals) +} + // UpdateAccount will abstract the write of an account to the secure trie. func (t *StateTrie) UpdateAccount(address common.Address, acc *types.StateAccount, _ int) error { hk := crypto.Keccak256(address.Bytes()) @@ -206,6 +229,29 @@ func (t *StateTrie) UpdateAccount(address common.Address, acc *types.StateAccoun return nil } +// UpdateAccountBatch attempts to update a list accounts in the batch manner. +func (t *StateTrie) UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, _ []int) error { + var ( + hkeys = make([][]byte, 0, len(addresses)) + values = make([][]byte, 0, len(accounts)) + ) + for _, addr := range addresses { + hk := crypto.Keccak256(addr.Bytes()) + if t.preimages != nil { + t.secKeyCache[common.Hash(hk)] = addr.Bytes() + } + hkeys = append(hkeys, hk) + } + for _, acc := range accounts { + data, err := rlp.EncodeToBytes(acc) + if err != nil { + return err + } + values = append(values, data) + } + return t.trie.UpdateBatch(hkeys, values) +} + func (t *StateTrie) UpdateContractCode(_ common.Address, _ common.Hash, _ []byte) error { return nil } diff --git a/trie/tracer.go b/trie/tracer.go index 206e8aa20d80..a9ef1805b960 100644 --- a/trie/tracer.go +++ b/trie/tracer.go @@ -19,6 +19,7 @@ package trie import ( "maps" "slices" + "sync" ) // opTracer tracks the changes of trie nodes. During the trie operations, @@ -33,12 +34,10 @@ import ( // while the latter is inserted/deleted in order to follow the rule of trie. // This tool can track all of them no matter the node is embedded in its // parent or not, but valueNode is never tracked. -// -// Note opTracer is not thread-safe, callers should be responsible for handling -// the concurrency issues by themselves. type opTracer struct { inserts map[string]struct{} deletes map[string]struct{} + lock sync.RWMutex } // newOpTracer initializes the tracer for capturing trie changes. @@ -53,6 +52,9 @@ func newOpTracer() *opTracer { // in the deletion set (resurrected node), then just wipe it from // the deletion set as it's "untouched". func (t *opTracer) onInsert(path []byte) { + t.lock.Lock() + defer t.lock.Unlock() + if _, present := t.deletes[string(path)]; present { delete(t.deletes, string(path)) return @@ -64,6 +66,9 @@ func (t *opTracer) onInsert(path []byte) { // in the addition set, then just wipe it from the addition set // as it's untouched. func (t *opTracer) onDelete(path []byte) { + t.lock.Lock() + defer t.lock.Unlock() + if _, present := t.inserts[string(path)]; present { delete(t.inserts, string(path)) return @@ -73,12 +78,18 @@ func (t *opTracer) onDelete(path []byte) { // reset clears the content tracked by tracer. func (t *opTracer) reset() { + t.lock.Lock() + defer t.lock.Unlock() + clear(t.inserts) clear(t.deletes) } // copy returns a deep copied tracer instance. func (t *opTracer) copy() *opTracer { + t.lock.RLock() + defer t.lock.RUnlock() + return &opTracer{ inserts: maps.Clone(t.inserts), deletes: maps.Clone(t.deletes), @@ -87,6 +98,9 @@ func (t *opTracer) copy() *opTracer { // deletedList returns a list of node paths which are deleted from the trie. func (t *opTracer) deletedList() [][]byte { + t.lock.RLock() + defer t.lock.RUnlock() + paths := make([][]byte, 0, len(t.deletes)) for path := range t.deletes { paths = append(paths, []byte(path)) @@ -97,11 +111,9 @@ func (t *opTracer) deletedList() [][]byte { // prevalueTracer tracks the original values of resolved trie nodes. Cached trie // node values are expected to be immutable. A zero-size node value is treated as // non-existent and should not occur in practice. -// -// Note prevalueTracer is not thread-safe, callers should be responsible for -// handling the concurrency issues by themselves. type prevalueTracer struct { data map[string][]byte + lock sync.RWMutex } // newPrevalueTracer initializes the tracer for capturing resolved trie nodes. @@ -115,18 +127,27 @@ func newPrevalueTracer() *prevalueTracer { // blob internally. Do not modify the value outside this function, // as it is not deep-copied. func (t *prevalueTracer) put(path []byte, val []byte) { + t.lock.Lock() + defer t.lock.Unlock() + t.data[string(path)] = val } // get returns the cached trie node value. If the node is not found, nil will // be returned. func (t *prevalueTracer) get(path []byte) []byte { + t.lock.RLock() + defer t.lock.RUnlock() + return t.data[string(path)] } // hasList returns a list of flags indicating whether the corresponding trie nodes // specified by the path exist in the trie. func (t *prevalueTracer) hasList(list [][]byte) []bool { + t.lock.RLock() + defer t.lock.RUnlock() + exists := make([]bool, 0, len(list)) for _, path := range list { _, ok := t.data[string(path)] @@ -137,16 +158,25 @@ func (t *prevalueTracer) hasList(list [][]byte) []bool { // values returns a list of values of the cached trie nodes. func (t *prevalueTracer) values() [][]byte { + t.lock.RLock() + defer t.lock.RUnlock() + return slices.Collect(maps.Values(t.data)) } // reset resets the cached content in the prevalueTracer. func (t *prevalueTracer) reset() { + t.lock.Lock() + defer t.lock.Unlock() + clear(t.data) } // copy returns a copied prevalueTracer instance. func (t *prevalueTracer) copy() *prevalueTracer { + t.lock.RLock() + defer t.lock.RUnlock() + // Shadow clone is used, as the cached trie node values are immutable return &prevalueTracer{ data: maps.Clone(t.data), diff --git a/trie/trie.go b/trie/trie.go index 307036faa99d..ade5c06e4997 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/triedb/database" + "golang.org/x/sync/errgroup" ) // Trie represents a Merkle Patricia Trie. Use New to create a trie that operates @@ -406,6 +407,72 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error } } +// UpdateBatch updates a batch of entries concurrently. +func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error { + // Short circuit if the trie is already committed and unusable. + if t.committed { + return ErrCommitted + } + if len(keys) != len(values) { + return fmt.Errorf("keys and values length mismatch: %d != %d", len(keys), len(values)) + } + // Insert the entries sequentially if there are not too many + // trie nodes in the trie. + fn, ok := t.root.(*fullNode) + + if !ok || len(keys) < 4 { // TODO(rjl493456442) the parallelism threshold should be twisted + for i, key := range keys { + err := t.Update(key, values[i]) + if err != nil { + return err + } + } + return nil + } + var ( + ikeys = make(map[byte][][]byte) + ivals = make(map[byte][][]byte) + eg errgroup.Group + ) + for i, key := range keys { + hkey := keybytesToHex(key) + ikeys[hkey[0]] = append(ikeys[hkey[0]], hkey) + ivals[hkey[0]] = append(ivals[hkey[0]], values[i]) + } + if len(keys) > 0 { + fn.flags = t.newFlag() + } + for p, k := range ikeys { + pos := p + ks := k + eg.Go(func() error { + vs := ivals[pos] + for i, k := range ks { + if len(vs[i]) != 0 { + _, n, err := t.insert(fn.Children[pos], []byte{pos}, k[1:], valueNode(vs[i])) + if err != nil { + return err + } + fn.Children[pos] = n + } else { + _, n, err := t.delete(fn.Children[pos], []byte{pos}, k[1:]) + if err != nil { + return err + } + fn.Children[pos] = n + } + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err + } + t.unhashed += len(keys) + t.uncommitted += len(keys) + return nil +} + // MustDelete is a wrapper of Delete and will omit any encountered error but // just print out an error message. func (t *Trie) MustDelete(key []byte) { diff --git a/trie/trie_test.go b/trie/trie_test.go index 68759c37c0b8..37580407ee0c 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -1499,3 +1499,57 @@ func testTrieCopyNewTrie(t *testing.T, entries []kv) { t.Errorf("Hash mismatch: old %v, new %v", hash, tr.Hash()) } } + +func TestUpdateBatch(t *testing.T) { + testUpdateBatch(t, []kv{ + {k: []byte("do"), v: []byte("verb")}, + {k: []byte("ether"), v: []byte("wookiedoo")}, + {k: []byte("horse"), v: []byte("stallion")}, + {k: []byte("shaman"), v: []byte("horse")}, + {k: []byte("doge"), v: []byte("coin")}, + {k: []byte("dog"), v: []byte("puppy")}, + }) + + var entries []kv + for i := 0; i < 256; i++ { + entries = append(entries, kv{k: testrand.Bytes(32), v: testrand.Bytes(32)}) + } + testUpdateBatch(t, entries) +} + +func testUpdateBatch(t *testing.T, entries []kv) { + var ( + base = NewEmpty(nil) + keys [][]byte + vals [][]byte + ) + for _, entry := range entries { + base.Update(entry.k, entry.v) + keys = append(keys, entry.k) + vals = append(vals, entry.v) + } + for i := 0; i < 10; i++ { + k, v := testrand.Bytes(32), testrand.Bytes(32) + base.Update(k, v) + keys = append(keys, k) + vals = append(vals, v) + } + + cmp := NewEmpty(nil) + if err := cmp.UpdateBatch(keys, vals); err != nil { + t.Fatalf("Failed to update batch, %v", err) + } + + // Traverse the original tree, the changes made on the copy one shouldn't + // affect the old one + for _, key := range keys { + v1, _ := base.Get(key) + v2, _ := cmp.Get(key) + if !bytes.Equal(v1, v2) { + t.Errorf("Unexpected data, key: %v, want: %v, got: %v", key, v1, v2) + } + } + if base.Hash() != cmp.Hash() { + t.Errorf("Hash mismatch: want %x, got %x", base.Hash(), cmp.Hash()) + } +} diff --git a/trie/verkle.go b/trie/verkle.go index c89a8f1d368c..f6420c4ecd99 100644 --- a/trie/verkle.go +++ b/trie/verkle.go @@ -155,6 +155,22 @@ func (t *VerkleTrie) UpdateAccount(addr common.Address, acc *types.StateAccount, return nil } +// UpdateAccountBatch attempts to update a list accounts in the batch manner. +func (t *VerkleTrie) UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, codeLens []int) error { + if len(addresses) != len(accounts) { + return fmt.Errorf("address and accounts length mismatch: %d != %d", len(addresses), len(accounts)) + } + if len(addresses) != len(codeLens) { + return fmt.Errorf("address and code length mismatch: %d != %d", len(addresses), len(codeLens)) + } + for i, addr := range addresses { + if err := t.UpdateAccount(addr, accounts[i], codeLens[i]); err != nil { + return err + } + } + return nil +} + // UpdateStorage implements state.Trie, writing the provided storage slot into // the tree. If the tree is corrupted, an error will be returned. func (t *VerkleTrie) UpdateStorage(address common.Address, key, value []byte) error { @@ -169,6 +185,19 @@ func (t *VerkleTrie) UpdateStorage(address common.Address, key, value []byte) er return t.root.Insert(k, v[:], t.nodeResolver) } +// UpdateStorageBatch attempts to update a list storages in the batch manner. +func (t *VerkleTrie) UpdateStorageBatch(address common.Address, keys [][]byte, values [][]byte) error { + if len(keys) != len(values) { + return fmt.Errorf("keys and values length mismatch: %d != %d", len(keys), len(values)) + } + for i, key := range keys { + if err := t.UpdateStorage(address, key, values[i]); err != nil { + return err + } + } + return nil +} + // DeleteAccount leaves the account untouched, as no account deletion can happen // in verkle. // There is a special corner case, in which an account that is prefunded, CREATE2-d