Skip to content

Commit

Permalink
[R4R] Pipecommit enable trie prefetcher (#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinglin89 authored and brilliant-lx committed Aug 1, 2022
1 parent 77c8372 commit 76e3c90
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 65 deletions.
13 changes: 2 additions & 11 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
accountCorrected bool // mark the accountData has been corrected ort not
verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

Expand Down Expand Up @@ -294,14 +293,6 @@ func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
defer dl.lock.Unlock()

dl.accountData = accounts
dl.accountCorrected = true
}

func (dl *diffLayer) AccountsCorrected() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.accountCorrected
}

// Parent returns the subsequent layer of a diff layer.
Expand Down
4 changes: 0 additions & 4 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func (dl *diskLayer) Verified() bool {
func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

func (dl *diskLayer) AccountsCorrected() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand Down
14 changes: 4 additions & 10 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ type Snapshot interface {
// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// AccountsCorrected checks whether the account data has been corrected during pipecommit
AccountsCorrected() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)
Expand All @@ -131,20 +128,17 @@ type Snapshot interface {
// Storage directly retrieves the storage data associated with a particular hash,
// within a particular account.
Storage(accountHash, storageHash common.Hash) ([]byte, error)

// Parent returns the subsequent layer of a snapshot, or nil if the base was
// reached.
Parent() snapshot
}

// snapshot is the internal version of the snapshot data layer that supports some
// additional methods compared to the public API.
type snapshot interface {
Snapshot

// Parent returns the subsequent layer of a snapshot, or nil if the base was
// reached.
//
// Note, the method is an internal helper to avoid type switching between the
// disk and diff layers. There is no locking involved.
Parent() snapshot

// Update creates a new layer on top of the existing snapshot diff tree with
// the specified data items.
//
Expand Down
12 changes: 1 addition & 11 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,8 @@ func (s *StateObject) finalise(prefetch bool) {
}
}

// The account root need to be updated before prefetch, otherwise the account root is empty
if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() {
if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil {
if acc != nil && len(acc.Root) != 0 {
s.data.Root = common.BytesToHash(acc.Root)
s.rootCorrected = true
}
}
}

prefetcher := s.db.prefetcher
if prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot {
if prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
if len(s.dirtyStorage) > 0 {
Expand Down
32 changes: 17 additions & 15 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ var (
// emptyRoot is the known root hash of an empty trie.
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")

// dummyRoot is the dummy account root before corrected in pipecommit sync mode,
// the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28
dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root"))

emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes())
)

Expand Down Expand Up @@ -218,7 +214,12 @@ func (s *StateDB) StartPrefetcher(namespace string) {
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
parent := s.snap.Parent()
if parent != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, parent.Root(), namespace)
} else {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, common.Hash{}, namespace)
}
}
}

Expand Down Expand Up @@ -1000,7 +1001,11 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
}
prefetcher := s.prefetcher
if prefetcher != nil && len(addressesToPrefetch) > 0 {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
if s.snap.Verified() {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
} else if prefetcher.rootParent != (common.Hash{}) {
prefetcher.prefetch(prefetcher.rootParent, addressesToPrefetch, emptyAddr)
}
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand Down Expand Up @@ -1035,11 +1040,12 @@ func (s *StateDB) CorrectAccountsRoot(blockRoot common.Hash) {
}
if accounts, err := snapshot.Accounts(); err == nil && accounts != nil {
for _, obj := range s.stateObjects {
if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot {
if !obj.deleted {
if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist {
obj.data.Root = common.BytesToHash(account.Root)
if obj.data.Root == (common.Hash{}) {
if len(account.Root) == 0 {
obj.data.Root = emptyRoot
} else {
obj.data.Root = common.BytesToHash(account.Root)
}
obj.rootCorrected = true
}
Expand All @@ -1053,12 +1059,8 @@ func (s *StateDB) PopulateSnapAccountAndStorage() {
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
if s.snap != nil {
root := obj.data.Root
storageChanged := s.populateSnapStorage(obj)
if storageChanged {
root = dummyRoot
}
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash)
s.populateSnapStorage(obj)
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
}
}
}
Expand Down
48 changes: 37 additions & 11 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ type prefetchMsg struct {
//
// Note, the prefetcher's API is not thread safe.
type triePrefetcher struct {
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of theaccount trie for metrics
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of theaccount trie for metrics
rootParent common.Hash //Root has of the account trie from block before the prvious one, designed for pipecommit mode
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher // to abort a single subfetcher and its children
closed int32
Expand All @@ -70,16 +71,22 @@ type triePrefetcher struct {
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter

accountStaleLoadMeter metrics.Meter
accountStaleDupMeter metrics.Meter
accountStaleSkipMeter metrics.Meter
accountStaleWasteMeter metrics.Meter
}

// newTriePrefetcher
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root, rootParent common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
db: db,
root: root,
rootParent: rootParent,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
Expand All @@ -94,6 +101,11 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),

accountStaleLoadMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/load", nil),
accountStaleDupMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/dup", nil),
accountStaleSkipMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/skip", nil),
accountStaleWasteMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/waste", nil),
}
go p.mainLoop()
return p
Expand Down Expand Up @@ -144,7 +156,8 @@ func (p *triePrefetcher) mainLoop() {
}

if metrics.EnabledExpensive {
if fetcher.root == p.root {
switch fetcher.root {
case p.root:
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
Expand All @@ -154,7 +167,19 @@ func (p *triePrefetcher) mainLoop() {
}
fetcher.lock.Unlock()
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {

case p.rootParent:
p.accountStaleLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountStaleDupMeter.Mark(int64(fetcher.dups))
p.accountStaleSkipMeter.Mark(int64(len(fetcher.tasks)))
fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.accountStaleWasteMeter.Mark(int64(len(fetcher.seen)))

default:
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
Expand All @@ -165,6 +190,7 @@ func (p *triePrefetcher) mainLoop() {
}
fetcher.lock.Unlock()
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))

}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]b

func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "")
skey := common.HexToHash("aaa")
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
Expand All @@ -80,7 +80,7 @@ func TestCopyAndClose(t *testing.T) {

func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "")
skey := common.HexToHash("aaa")
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
a := prefetcher.trie(db.originalRoot)
Expand All @@ -96,7 +96,7 @@ func TestUseAfterClose(t *testing.T) {

func TestCopyClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "")
skey := common.HexToHash("aaa")
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy := prefetcher.copy()
Expand Down

0 comments on commit 76e3c90

Please sign in to comment.