diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 907510a9bf4b..313eff6f619b 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -218,10 +218,9 @@ func (b *SimulatedBackend) Rollback() { func (b *SimulatedBackend) rollback(parent *types.Block) { blocks, _ := core.GenerateChain(b.config, parent, b.blockchain.Engine(), b.database, 1, func(int, *core.BlockGen) {}) - stateDB, _ := b.blockchain.State() b.pendingBlock = blocks[0] - b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database()) + b.pendingState, _ = state.New(b.pendingBlock.Root(), b.blockchain.StateCache()) } // Fork creates a side-chain that can be used to simulate reorgs. diff --git a/core/blockchain.go b/core/blockchain.go index 675278d05aa7..9f0d64c8b6cf 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -207,9 +207,9 @@ type BlockChain struct { procInterrupt atomic.Bool // interrupt signaler for block processing engine consensus.Engine - validator Validator // Block and state validator interface - prefetcher Prefetcher // Block state prefetcher interface - processor Processor // Block transaction processor interface + validator Validator // Block and state validator interface + prefetcher Prefetcher + processor Processor // Block transaction processor interface vmConfig vm.Config IPCEndpoint string @@ -1723,6 +1723,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err != nil { return it.index, events, coalescedLogs, err } + // Enable prefetching to pull in trie node paths while processing transactions + statedb.StartPrefetcher("chain") + defer statedb.StopPrefetcher() // stopped on write anyway, defer meant to catch early error returns // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. @@ -1740,9 +1743,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] }(time.Now()) } } - // Process block using the parent state as reference point. - t0 := time.Now() + substart := time.Now() isTIPXDCXReceiver := bc.Config().IsTIPXDCXReceiver(block.Number()) tradingState, lendingState, err := bc.processTradingAndLendingStates(isTIPXDCXReceiver, block, parent, statedb) if err != nil { @@ -1752,47 +1754,50 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } feeCapacity := state.GetTRC21FeeCapacityFromStateWithCache(parent.Root, statedb) receipts, logs, usedGas, err := bc.processor.Process(block, statedb, tradingState, bc.vmConfig, feeCapacity) - t1 := time.Now() if err != nil { bc.reportBlock(block, receipts, err) followupInterrupt.Store(true) return it.index, events, coalescedLogs, err } + // Update the metrics touched during block processing + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them + triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation + trieproc := statedb.AccountReads + statedb.AccountUpdates + trieproc += statedb.StorageReads + statedb.StorageUpdates + + blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) + // Validate the state using the default validator + substart = time.Now() err = bc.validator.ValidateState(block, statedb, receipts, usedGas) if err != nil { bc.reportBlock(block, receipts, err) return it.index, events, coalescedLogs, err } - t2 := time.Now() proctime := time.Since(start) + // Update the metrics touched during block validation + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them + storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them + + blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)) + // Write the block to the chain and get the status. + substart = time.Now() status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState) - t3 := time.Now() followupInterrupt.Store(true) if err != nil { return it.index, events, coalescedLogs, err } + // Update the metrics touched during block commit + accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them + storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them - // Update the metrics subsystem with all the measurements - accountReadTimer.Update(statedb.AccountReads) - accountHashTimer.Update(statedb.AccountHashes) - accountUpdateTimer.Update(statedb.AccountUpdates) - accountCommitTimer.Update(statedb.AccountCommits) - - storageReadTimer.Update(statedb.StorageReads) - storageHashTimer.Update(statedb.StorageHashes) - storageUpdateTimer.Update(statedb.StorageUpdates) - storageCommitTimer.Update(statedb.StorageCommits) - - trieAccess := statedb.AccountReads + statedb.AccountHashes + statedb.AccountUpdates + statedb.AccountCommits - trieAccess += statedb.StorageReads + statedb.StorageHashes + statedb.StorageUpdates + statedb.StorageCommits - + blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits) blockInsertTimer.UpdateSince(start) - blockExecutionTimer.Update(t1.Sub(t0) - trieAccess) - blockValidationTimer.Update(t2.Sub(t1)) - blockWriteTimer.Update(t3.Sub(t2)) switch status { case CanonStatTy: @@ -1816,7 +1821,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] events = append(events, ChainSideEvent{block}) bc.UpdateBlocksHashCache(block) } - blockInsertTimer.UpdateSince(start) stats.processed++ stats.usedGas += usedGas diff --git a/core/state/database.go b/core/state/database.go index cd3f8665cf4e..7e3e1b8d70f0 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -127,12 +127,20 @@ type cachingDB struct { // OpenTrie opens the main account trie at a specific root hash. func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { - return trie.NewSecure(root, db.db) + tr, err := trie.NewSecure(root, db.db) + if err != nil { + return nil, err + } + return tr, nil } // OpenStorageTrie opens the storage trie of an account. func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { - return trie.NewSecure(root, db.db) + tr, err := trie.NewSecure(root, db.db) + if err != nil { + return nil, err + } + return tr, nil } // CopyTrie returns an independent copy of the given trie. diff --git a/core/state/state_object.go b/core/state/state_object.go index e0a02fba17bf..31103abad39b 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -163,11 +163,20 @@ func (s *stateObject) touch() { func (s *stateObject) getTrie(db Database) Trie { if s.trie == nil { - var err error - s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) - if err != nil { - s.trie, _ = db.OpenStorageTrie(s.addrHash, types.EmptyRootHash) - s.setError(fmt.Errorf("can't create storage trie: %v", err)) + // Try fetching from prefetcher first + // We don't prefetch empty tries + if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil { + // When the miner is creating the pending state, there is no + // prefetcher + s.trie = s.db.prefetcher.trie(s.data.Root) + } + if s.trie == nil { + var err error + s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) + if err != nil { + s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{}) + s.setError(fmt.Errorf("can't create storage trie: %v", err)) + } } } return s.trie @@ -266,9 +275,16 @@ func (s *stateObject) setState(key, value common.Hash) { // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. -func (s *stateObject) finalise() { +func (s *stateObject) finalise(prefetch bool) { + slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { s.pendingStorage[key] = value + if value != s.originStorage[key] { + slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure + } + } + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { + s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch) } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) @@ -279,7 +295,7 @@ func (s *stateObject) finalise() { // It will return nil if the trie has not been loaded and no changes have been made func (s *stateObject) updateTrie(db Database) Trie { // Make sure all dirty slots are finalized into the pending storage area - s.finalise() + s.finalise(false) // Don't prefetch any more, pull directly if need be if len(s.pendingStorage) == 0 { return s.trie } @@ -287,6 +303,8 @@ func (s *stateObject) updateTrie(db Database) Trie { defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now()) // Insert all the pending updates into the trie tr := s.getTrie(db) + + usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { @@ -294,13 +312,18 @@ func (s *stateObject) updateTrie(db Database) Trie { } s.originStorage[key] = value + var v []byte if (value == common.Hash{}) { s.setError(tr.TryDelete(key[:])) - continue + } else { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.setError(tr.TryUpdate(key[:], v)) } - // Encoding []byte cannot fail, ok to ignore the error. - v, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.setError(tr.TryUpdate(key[:], v)) + usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure + } + if s.db.prefetcher != nil { + s.db.prefetcher.used(s.data.Root, usedStorage) } if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) diff --git a/core/state/state_test.go b/core/state/state_test.go index eedd5931bf33..d754ad9eff88 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -159,7 +159,7 @@ func TestSnapshot2(t *testing.T) { state.setStateObject(so0) root, _ := state.Commit(false) - state.Reset(root) + state, _ = New(root, state.db) // and one with deleted == true so1 := state.getStateObject(stateobjaddr1) diff --git a/core/state/statedb.go b/core/state/statedb.go index d3f436acf953..099ffacf3698 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -45,8 +45,10 @@ type revision struct { // * Contracts // * Accounts type StateDB struct { - db Database - trie Trie + db Database + prefetcher *triePrefetcher + originalRoot common.Hash // The pre-state root, before any changes were made + trie Trie // This map holds 'live' objects, which will get modified while processing a state transition. stateObjects map[common.Address]*stateObject @@ -110,6 +112,7 @@ func New(root common.Hash, db Database) (*StateDB, error) { return &StateDB{ db: db, trie: tr, + originalRoot: root, stateObjects: make(map[common.Address]*stateObject), stateObjectsPending: make(map[common.Address]struct{}), stateObjectsDirty: make(map[common.Address]struct{}), @@ -121,6 +124,26 @@ func New(root common.Hash, db Database) (*StateDB, error) { }, nil } +// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the +// state trie concurrently while the state is mutated so that when we reach the +// commit phase, most of the needed data is already hot. +func (s *StateDB) StartPrefetcher(namespace string) { + if s.prefetcher != nil { + s.prefetcher.close() + s.prefetcher = nil + } + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) +} + +// StopPrefetcher terminates a running prefetcher and reports any leftover stats +// from the gathered metrics. +func (s *StateDB) StopPrefetcher() { + if s.prefetcher != nil { + s.prefetcher.close() + s.prefetcher = nil + } +} + // setError remembers the first non-nil error it is called with. func (s *StateDB) setError(err error) { if s.dbErr == nil { @@ -132,27 +155,6 @@ func (s *StateDB) Error() error { return s.dbErr } -// Reset clears out all ephemeral state objects from the state db, but keeps -// the underlying state trie to avoid reloading data for the next operations. -func (s *StateDB) Reset(root common.Hash) error { - tr, err := s.db.OpenTrie(root) - if err != nil { - return err - } - s.trie = tr - s.stateObjects = make(map[common.Address]*stateObject) - s.stateObjectsPending = make(map[common.Address]struct{}) - s.stateObjectsDirty = make(map[common.Address]struct{}) - s.thash = common.Hash{} - s.txIndex = 0 - s.logs = make(map[common.Hash][]*types.Log) - s.logSize = 0 - s.preimages = make(map[common.Hash][]byte) - s.clearJournalAndRefund() - s.accessList = newAccessList() - return nil -} - func (s *StateDB) AddLog(log *types.Log) { s.journal.append(addLogChange{txhash: s.thash}) @@ -699,6 +701,12 @@ func (s *StateDB) Copy() *StateDB { state.transientStorage = s.transientStorage.Copy() + // If there's a prefetcher running, make an inactive copy of it that can + // only access data but does not actively preload (since the user will not + // know that they need to explicitly terminate an active copy). + if s.prefetcher != nil { + state.prefetcher = s.prefetcher.copy() + } return state } @@ -735,6 +743,7 @@ func (s *StateDB) GetRefund() uint64 { // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. func (s *StateDB) Finalise(deleteEmptyObjects bool) { + addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) for addr := range s.journal.dirties { obj, exist := s.stateObjects[addr] if !exist { @@ -744,11 +753,19 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { if obj.selfDestructed || (deleteEmptyObjects && obj.empty()) { obj.deleted = true } else { - obj.finalise() + obj.finalise(true) // Prefetch slots in the background } obj.created = false s.stateObjectsPending[addr] = struct{}{} s.stateObjectsDirty[addr] = struct{}{} + + // At this point, also ship the address off to the precacher. The precacher + // will start loading tries, and when the change is eventually committed, + // the commit-phase will be a lot faster + addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure + } + if s.prefetcher != nil && len(addressesToPrefetch) > 0 { + s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch) } // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() @@ -761,14 +778,49 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) + // If there was a trie prefetcher operating, it gets aborted and irrevocably + // modified after we start retrieving tries. Remove it from the statedb after + // this round of use. + // + // This is weird pre-byzantium since the first tx runs with a prefetcher and + // the remainder without, but pre-byzantium even the initial prefetcher is + // useless, so no sleep lost. + prefetcher := s.prefetcher + if s.prefetcher != nil { + defer func() { + s.prefetcher.close() + s.prefetcher = nil + }() + } + // Although naively it makes sense to retrieve the account trie and then do + // the contract storage and account updates sequentially, that short circuits + // the account prefetcher. Instead, let's process all the storage updates + // first, giving the account prefeches just a few more milliseconds of time + // to pull useful data from disk. for addr := range s.stateObjectsPending { - obj := s.stateObjects[addr] - if obj.deleted { + if obj := s.stateObjects[addr]; !obj.deleted { + obj.updateRoot(s.db) + } + } + // Now we're about to start to write changes to the trie. The trie is so far + // _untouched_. We can check with the prefetcher, if it can give us a trie + // which has the same root, but also has some content loaded into it. + if prefetcher != nil { + if trie := prefetcher.trie(s.originalRoot); trie != nil { + s.trie = trie + } + } + usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; obj.deleted { s.deleteStateObject(obj) } else { - obj.updateRoot(s.db) s.updateStateObject(obj) } + usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure + } + if prefetcher != nil { + prefetcher.used(s.originalRoot, usedAddrs) } if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index f762e31d4776..77efce19ae4b 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -484,7 +484,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error { func (s *StateSuite) TestTouchDelete(c *check.C) { s.state.GetOrNewStateObject(common.Address{}) root, _ := s.state.Commit(false) - s.state.Reset(root) + s.state, _ = New(root, s.state.db) snapshot := s.state.Snapshot() s.state.AddBalance(common.Address{}, new(big.Int)) @@ -722,7 +722,7 @@ func TestDeleteCreateRevert(t *testing.T) { state.SetBalance(addr, big.NewInt(1)) root, _ := state.Commit(false) - state.Reset(root) + state, _ = New(root, state.db) // Simulate self-destructing in one transaction, then create-reverting in another state.SelfDestruct(addr) @@ -734,7 +734,7 @@ func TestDeleteCreateRevert(t *testing.T) { // Commit the entire state and make sure we don't crash and have the correct state root, _ = state.Commit(true) - state.Reset(root) + state, _ = New(root, state.db) if state.getStateObject(addr) != nil { t.Fatalf("self-destructed contract came alive") diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go new file mode 100644 index 000000000000..968d466fa922 --- /dev/null +++ b/core/state/trie_prefetcher.go @@ -0,0 +1,334 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package state + +import ( + "sync" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/metrics" +) + +var ( + // triePrefetchMetricsPrefix is the prefix under which to publis the metrics. + triePrefetchMetricsPrefix = "trie/prefetch/" +) + +// triePrefetcher is an active prefetcher, which receives accounts or storage +// items and does trie-loading of them. The goal is to get as much useful content +// into the caches as possible. +// +// Note, the prefetcher's API is not thread safe. +type triePrefetcher struct { + db Database // Database to fetch trie nodes through + root common.Hash // Root hash of theaccount trie for metrics + fetches map[common.Hash]Trie // Partially or fully fetcher tries + fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + + deliveryMissMeter *metrics.Meter + accountLoadMeter *metrics.Meter + accountDupMeter *metrics.Meter + accountSkipMeter *metrics.Meter + accountWasteMeter *metrics.Meter + storageLoadMeter *metrics.Meter + storageDupMeter *metrics.Meter + storageSkipMeter *metrics.Meter + storageWasteMeter *metrics.Meter +} + +// newTriePrefetcher +func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { + prefix := triePrefetchMetricsPrefix + namespace + p := &triePrefetcher{ + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + + deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), + accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), + accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), + accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), + accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), + storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), + storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), + storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), + storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + } + return p +} + +// close iterates over all the subfetchers, aborts any that were left spinning +// and reports the stats to the metrics subsystem. +func (p *triePrefetcher) close() { + for _, fetcher := range p.fetchers { + fetcher.abort() // safe to do multiple times + + if metrics.Enabled() { + if fetcher.root == p.root { + p.accountLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountDupMeter.Mark(int64(fetcher.dups)) + p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.accountWasteMeter.Mark(int64(len(fetcher.seen))) + } else { + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + p.storageDupMeter.Mark(int64(fetcher.dups)) + p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.storageWasteMeter.Mark(int64(len(fetcher.seen))) + } + } + } + // Clear out all fetchers (will crash on a second call, deliberate) + p.fetchers = nil +} + +// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data +// already loaded will be copied over, but no goroutines will be started. This +// is mostly used in the miner which creates a copy of it's actively mutated +// state to be sealed while it may further mutate the state. +func (p *triePrefetcher) copy() *triePrefetcher { + copy := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map + + deliveryMissMeter: p.deliveryMissMeter, + accountLoadMeter: p.accountLoadMeter, + accountDupMeter: p.accountDupMeter, + accountSkipMeter: p.accountSkipMeter, + accountWasteMeter: p.accountWasteMeter, + storageLoadMeter: p.storageLoadMeter, + storageDupMeter: p.storageDupMeter, + storageSkipMeter: p.storageSkipMeter, + storageWasteMeter: p.storageWasteMeter, + } + // If the prefetcher is already a copy, duplicate the data + if p.fetches != nil { + for root, fetch := range p.fetches { + copy.fetches[root] = p.db.CopyTrie(fetch) + } + return copy + } + // Otherwise we're copying an active fetcher, retrieve the current states + for root, fetcher := range p.fetchers { + copy.fetches[root] = fetcher.peek() + } + return copy +} + +// prefetch schedules a batch of trie items to prefetch. +func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) { + // If the prefetcher is an inactive one, bail out + if p.fetches != nil { + return + } + // Active fetcher, schedule the retrievals + fetcher := p.fetchers[root] + if fetcher == nil { + fetcher = newSubfetcher(p.db, root) + p.fetchers[root] = fetcher + } + fetcher.schedule(keys) +} + +// trie returns the trie matching the root hash, or nil if the prefetcher doesn't +// have it. +func (p *triePrefetcher) trie(root common.Hash) Trie { + // If the prefetcher is inactive, return from existing deep copies + if p.fetches != nil { + trie := p.fetches[root] + if trie == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return p.db.CopyTrie(trie) + } + // Otherwise the prefetcher is active, bail if no trie was prefetched for this root + fetcher := p.fetchers[root] + if fetcher == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. + fetcher.abort() // safe to do multiple times + + trie := fetcher.peek() + if trie == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return trie +} + +// used marks a batch of state items used to allow creating statistics as to +// how useful or wasteful the prefetcher is. +func (p *triePrefetcher) used(root common.Hash, used [][]byte) { + if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.used = used + } +} + +// subfetcher is a trie fetcher goroutine responsible for pulling entries for a +// single trie. It is spawned when a new root is encountered and lives until the +// main prefetcher is paused and either all requested items are processed or if +// the trie being worked on is retrieved from the prefetcher. +type subfetcher struct { + db Database // Database to load trie nodes through + root common.Hash // Root hash of the trie to prefetch + trie Trie // Trie being populated with nodes + + tasks [][]byte // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue + + wake chan struct{} // Wake channel if a new task is scheduled + stop chan struct{} // Channel to interrupt processing + term chan struct{} // Channel to signal iterruption + copy chan chan Trie // Channel to request a copy of the current trie + + seen map[string]struct{} // Tracks the entries already loaded + dups int // Number of duplicate preload tasks + used [][]byte // Tracks the entries used in the end +} + +// newSubfetcher creates a goroutine to prefetch state items belonging to a +// particular root hash. +func newSubfetcher(db Database, root common.Hash) *subfetcher { + sf := &subfetcher{ + db: db, + root: root, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + copy: make(chan chan Trie), + seen: make(map[string]struct{}), + } + go sf.loop() + return sf +} + +// schedule adds a batch of trie keys to the queue to prefetch. +func (sf *subfetcher) schedule(keys [][]byte) { + // Append the tasks to the current queue + sf.lock.Lock() + sf.tasks = append(sf.tasks, keys...) + sf.lock.Unlock() + + // Notify the prefetcher, it's fine if it's already terminated + select { + case sf.wake <- struct{}{}: + default: + } +} + +// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it +// is currently. +func (sf *subfetcher) peek() Trie { + ch := make(chan Trie) + select { + case sf.copy <- ch: + // Subfetcher still alive, return copy from it + return <-ch + + case <-sf.term: + // Subfetcher already terminated, return a copy directly + if sf.trie == nil { + return nil + } + return sf.db.CopyTrie(sf.trie) + } +} + +// abort interrupts the subfetcher immediately. It is safe to call abort multiple +// times but it is not thread safe. +func (sf *subfetcher) abort() { + select { + case <-sf.stop: + default: + close(sf.stop) + } + <-sf.term +} + +// loop waits for new tasks to be scheduled and keeps loading them until it runs +// out of tasks or its underlying trie is retrieved for committing. +func (sf *subfetcher) loop() { + // No matter how the loop stops, signal anyone waiting that it's terminated + defer close(sf.term) + + // Start by opening the trie and stop processing if it fails + trie, err := sf.db.OpenTrie(sf.root) + if err != nil { + log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) + return + } + sf.trie = trie + + // Trie opened successfully, keep prefetching items + for { + select { + case <-sf.wake: + // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + sf.lock.Lock() + tasks := sf.tasks + sf.tasks = nil + sf.lock.Unlock() + + // Prefetch any tasks until the loop is interrupted + for i, task := range tasks { + select { + case <-sf.stop: + // If termination is requested, add any leftover back and return + sf.lock.Lock() + sf.tasks = append(sf.tasks, tasks[i:]...) + sf.lock.Unlock() + return + + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) + + default: + // No termination request yet, prefetch the next entry + taskid := string(task) + if _, ok := sf.seen[taskid]; ok { + sf.dups++ + } else { + sf.trie.TryGet(task) + sf.seen[taskid] = struct{}{} + } + } + } + + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) + + case <-sf.stop: + // Termination is requested, abort and leave remaining tasks + return + } + } +} diff --git a/eth/api_tracer.go b/eth/api_tracer.go index 2785b68450b3..80097c86d3e4 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -331,7 +331,8 @@ func (api *DebugAPI) traceChain(ctx context.Context, start, end *types.Block, co failed = err break } - if err := statedb.Reset(root); err != nil { + statedb, err = state.New(root, database) + if err != nil { failed = err break } @@ -601,7 +602,8 @@ func (api *DebugAPI) computeStateDB(block *types.Block, reexec uint64) (*state.S if err != nil { return nil, nil, err } - if err := statedb.Reset(root); err != nil { + statedb, err = state.New(root, database) + if err != nil { return nil, nil, err } database.TrieDB().Reference(root, common.Hash{}) diff --git a/miner/miner.go b/miner/miner.go index f48df9491d41..d9bda24ccf9b 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -51,15 +51,14 @@ type Backend interface { // Miner creates blocks and searches for proof-of-work values. type Miner struct { - mux *event.TypeMux - - worker *worker - + mux *event.TypeMux + worker *worker coinbase common.Address - mining int32 eth Backend engine consensus.Engine + exitCh chan struct{} + mining int32 canStart int32 // can start indicates whether we can start the mining operation shouldStart int32 // should start indicates whether we should start after sync } @@ -69,6 +68,7 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con eth: eth, mux: mux, engine: engine, + exitCh: make(chan struct{}), worker: newWorker(config, engine, common.Address{}, eth, mux, announceTxs), canStart: 1, } @@ -84,23 +84,40 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con // and halt your mining operation for as long as the DOS continues. func (m *Miner) update() { events := m.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) - for ev := range events.Chan() { - switch ev.Data.(type) { - case downloader.StartEvent: - atomic.StoreInt32(&m.canStart, 0) - if m.Mining() { - m.Stop() - atomic.StoreInt32(&m.shouldStart, 1) - log.Info("Mining aborted due to sync") - } - case downloader.DoneEvent, downloader.FailedEvent: - shouldStart := atomic.LoadInt32(&m.shouldStart) == 1 + defer func() { + if !events.Closed() { + events.Unsubscribe() + } + }() - atomic.StoreInt32(&m.canStart, 1) - atomic.StoreInt32(&m.shouldStart, 0) - if shouldStart { - m.Start(m.coinbase) + for { + select { + case ev := <-events.Chan(): + if ev == nil { + return + } + switch ev.Data.(type) { + case downloader.StartEvent: + atomic.StoreInt32(&m.canStart, 0) + if m.Mining() { + m.Stop() + atomic.StoreInt32(&m.shouldStart, 1) + log.Info("Mining aborted due to sync") + } + case downloader.DoneEvent, downloader.FailedEvent: + shouldStart := atomic.LoadInt32(&m.shouldStart) == 1 + + atomic.StoreInt32(&m.canStart, 1) + atomic.StoreInt32(&m.shouldStart, 0) + if shouldStart { + m.Start(m.coinbase) + } + // stop immediately and ignore all further pending events + return } + case <-m.exitCh: + m.worker.close() + return } } } @@ -121,6 +138,7 @@ func (m *Miner) Start(coinbase common.Address) { } func (m *Miner) Stop() { + close(m.exitCh) m.worker.stop() atomic.StoreInt32(&m.mining, 0) atomic.StoreInt32(&m.shouldStart, 0) diff --git a/miner/worker.go b/miner/worker.go index 1d6ef3278463..a0e736c793e0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -110,7 +110,7 @@ type worker struct { // Feeds pendingLogsFeed event.Feed - // update loop + // Subscriptions mux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription @@ -253,6 +253,14 @@ func (w *worker) stop() { atomic.StoreInt32(&w.atWork, 0) } +// close terminates all background threads maintained by the worker. +// Note the worker does not support being closed multiple times. +func (w *worker) close() { + if w.current != nil && w.current.state != nil { + w.current.state.StopPrefetcher() + } +} + func (w *worker) register(agent Agent) { w.mu.Lock() defer w.mu.Unlock() @@ -551,6 +559,7 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { if err != nil { return err } + state.StartPrefetcher("miner") author, _ := w.chain.Engine().Author(parent.Header()) var XDCxState *tradingstate.TradingStateDB @@ -586,6 +595,12 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { // Keep track of transactions which return errors so they can be removed work.tcount = 0 + + // Swap out the old work with the new one, terminating any leftover prefetcher + // processes in the mean time and starting a new one. + if w.current != nil && w.current.state != nil { + w.current.state.StopPrefetcher() + } w.current = work return nil }